Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public class BulkItemRequest implements Streamable {

}

protected BulkItemRequest(int id, DocWriteRequest request) {
// NOTE: public for testing only
public BulkItemRequest(int id, DocWriteRequest request) {
this.id = id;
this.request = request;
}
Expand All @@ -56,13 +57,11 @@ public String index() {
return request.indices()[0];
}

// NOTE: protected for testing only
protected BulkItemResponse getPrimaryResponse() {
BulkItemResponse getPrimaryResponse() {
return primaryResponse;
}

// NOTE: protected for testing only
protected void setPrimaryResponse(BulkItemResponse primaryResponse) {
void setPrimaryResponse(BulkItemResponse primaryResponse) {
this.primaryResponse = primaryResponse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
Expand Down Expand Up @@ -171,17 +173,34 @@ public static class Failure implements Writeable, ToXContent {
private final String id;
private final Exception cause;
private final RestStatus status;
private final long seqNo;

Failure(String index, String type, String id, Exception cause, RestStatus status) {
/**
* For write failures before operation was assigned a sequence number.
*
* use @{link {@link #Failure(String, String, String, Exception, long)}}
* to record operation sequence no with failure
*/
public Failure(String index, String type, String id, Exception cause) {
this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbersService.UNASSIGNED_SEQ_NO);
}

public Failure(String index, String type, String id, Exception cause, RestStatus status) {
this(index, type, id, cause, status, SequenceNumbersService.UNASSIGNED_SEQ_NO);
}

/** For write failures after operation was assigned a sequence number. */
public Failure(String index, String type, String id, Exception cause, long seqNo) {
this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo);
}

public Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo) {
this.index = index;
this.type = type;
this.id = id;
this.cause = cause;
this.status = status;
}

public Failure(String index, String type, String id, Exception cause) {
this(index, type, id, cause, ExceptionsHelper.status(cause));
this.seqNo = seqNo;
}

/**
Expand All @@ -193,6 +212,11 @@ public Failure(StreamInput in) throws IOException {
id = in.readOptionalString();
cause = in.readException();
status = ExceptionsHelper.status(cause);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNo = in.readZLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

@Override
Expand All @@ -201,6 +225,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(getType());
out.writeOptionalString(getId());
out.writeException(getCause());
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(getSeqNo());
}
}


Expand Down Expand Up @@ -246,6 +273,15 @@ public Exception getCause() {
return cause;
}

/**
* The operation sequence number generated by primary
* NOTE: {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
* indicates sequence number was not generated by primary
*/
public long getSeqNo() {
return seqNo;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(INDEX_FIELD, index);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ protected abstract WriteReplicaResult<ReplicaRequest> shardOperationOnReplica(

/**
* Result of taking the action on the primary.
*
* NOTE: public for testing
*/
protected static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
public static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
Response extends ReplicationResponse & WriteResponse> extends PrimaryResult<ReplicaRequest, Response>
implements RespondingWriteResult {
boolean finishedAsyncActions;
Expand Down
30 changes: 18 additions & 12 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ public Operation.TYPE getOperationType() {

void setTranslogLocation(Translog.Location translogLocation) {
if (freeze.get() == null) {
assert failure == null : "failure has to be null to set translog location";
this.translogLocation = translogLocation;
} else {
throw new IllegalStateException("result is already frozen");
Expand Down Expand Up @@ -432,7 +431,7 @@ public boolean isFound() {

}

static class NoOpResult extends Result {
public static class NoOpResult extends Result {

NoOpResult(long seqNo) {
super(Operation.TYPE.NO_OP, 0, seqNo);
Expand Down Expand Up @@ -1154,24 +1153,31 @@ public String reason() {
return reason;
}

public NoOp(
final Term uid,
final long seqNo,
final long primaryTerm,
final long version,
final VersionType versionType,
final Origin origin,
final long startTime,
final String reason) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
public NoOp(final long seqNo, final long primaryTerm, final Origin origin, final long startTime, final String reason) {
super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime);
this.reason = reason;
}

@Override
public Term uid() {
throw new UnsupportedOperationException();
}

@Override
public String type() {
throw new UnsupportedOperationException();
}

@Override
public long version() {
throw new UnsupportedOperationException();
}

@Override
public VersionType versionType() {
throw new UnsupportedOperationException();
}

@Override
String id() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,16 @@ public IndexResult index(Index index) throws IOException {
indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing,
plan.currentNotFoundOrDeleted);
}
if (indexResult.hasFailure() == false &&
index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
Translog.Location location =
translog.add(new Translog.Index(index, indexResult));
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
if (indexResult.hasFailure() == false) {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog with the generated seq_no
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
Expand Down Expand Up @@ -749,7 +755,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
* we return a `MATCH_ANY` version to indicate no document was index. The value is
* not used anyway
*/
return new IndexResult(ex, Versions.MATCH_ANY, index.seqNo());
return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing);
} else {
throw ex;
}
Expand Down Expand Up @@ -900,10 +906,16 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion,
plan.currentlyDeleted == false);
}
if (!deleteResult.hasFailure() &&
delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
Translog.Location location =
translog.add(new Translog.Delete(delete, deleteResult));
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
if (deleteResult.hasFailure() == false) {
location = translog.add(new Translog.Delete(delete, deleteResult));
} else if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(),
delete.primaryTerm(), deleteResult.getFailure().getMessage()));
} else {
location = null;
}
deleteResult.setTranslogLocation(location);
}
if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
Expand Down
33 changes: 23 additions & 10 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -569,28 +569,34 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
return result;
}

public Engine.NoOp prepareMarkingSeqNoAsNoOp(long seqNo, String reason) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow the standard that we have use here, I think this should be named prepareNoOpOnReplica(...) (since we have prepareDeleteOnReplica and prepareIndexOnReplica)

verifyReplicationTarget();
long startTime = System.nanoTime();
return new Engine.NoOp(seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason);
}

public Engine.NoOpResult markSeqNoAsNoOp(Engine.NoOp noOp) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the other IndexShard methods correspond to their Engine implementations, so I think this should be called just noOp

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my ask. I find IndexShard.noOp to be confusing - why are we doing nothing?. I would be good with renaming the engine method to match the one here. @dakrone would you be good with that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's reasonable to me!

ensureWriteAllowed(noOp);
Engine engine = getEngine();
return engine.noOp(noOp);
}

public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {
verifyPrimary();
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType();
final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null);
final Term uid = MappedFieldType.extractTerm(uidQuery);
final Term uid = extractUid(type, id);
return prepareDelete(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version,
versionType, Engine.Operation.Origin.PRIMARY);
}

public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm,
long version, VersionType versionType) {
verifyReplicationTarget();
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType();
final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null);
final Term uid = MappedFieldType.extractTerm(uidQuery);
final Term uid = extractUid(type, id);
return prepareDelete(type, id, uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA);
}

static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
VersionType versionType, Engine.Operation.Origin origin) {
private static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
VersionType versionType, Engine.Operation.Origin origin) {
long startTime = System.nanoTime();
return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime);
}
Expand All @@ -601,6 +607,13 @@ public Engine.DeleteResult delete(Engine.Delete delete) throws IOException {
return delete(engine, delete);
}

private Term extractUid(String type, String id) {
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType();
final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null);
return MappedFieldType.extractTerm(uidQuery);
}

private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException {
active.set(true);
final Engine.DeleteResult result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;

Expand Down Expand Up @@ -182,7 +180,7 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio
final String reason = noOp.reason();
logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason);
final Engine.NoOp engineNoOp =
new Engine.NoOp(null, seqNo, primaryTerm, 0, VersionType.INTERNAL, origin, System.nanoTime(), reason);
new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason);
noOp(engine, engineNoOp);
break;
default:
Expand Down
Loading