Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
Expand Down Expand Up @@ -885,6 +886,20 @@ Params withVersionType(VersionType versionType) {
return this;
}

Params withIfSeqNo(long ifSeqNo) {
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
return putParam("if_seq_no", Long.toString(ifSeqNo));
}
return this;
}

Params withIfPrimaryTerm(long ifPrimaryTerm) {
if (ifPrimaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
return putParam("if_primary_term", Long.toString(ifPrimaryTerm));
}
return this;
}

Params withWaitForActiveShards(ActiveShardCount activeShardCount) {
return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ static Request putWatch(PutWatchRequest putWatchRequest) {
.build();

Request request = new Request(HttpPut.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request).withVersion(putWatchRequest.getVersion());
RequestConverters.Params params = new RequestConverters.Params(request)
.withVersion(putWatchRequest.getVersion())
.withIfSeqNo(putWatchRequest.ifSeqNo())
.withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm());
if (putWatchRequest.isActive() == false) {
params.putParam("active", "false");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

public class GetWatchResponse {
private final String id;
private final long version;
private final long seqNo;
private final long primaryTerm;
private final WatchStatus status;

private final BytesReference source;
Expand All @@ -43,15 +48,18 @@ public class GetWatchResponse {
* Ctor for missing watch
*/
public GetWatchResponse(String id) {
this(id, Versions.NOT_FOUND, null, null, null);
this(id, Versions.NOT_FOUND, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, null, null, null);
}

public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) {
public GetWatchResponse(String id, long version, long seqNo, long primaryTerm, WatchStatus status,
BytesReference source, XContentType xContentType) {
this.id = id;
this.version = version;
this.status = status;
this.source = source;
this.xContentType = xContentType;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}

public String getId() {
Expand All @@ -62,6 +70,14 @@ public long getVersion() {
return version;
}

public long getSeqNo() {
return seqNo;
}

public long getPrimaryTerm() {
return primaryTerm;
}

public boolean isFound() {
return version != Versions.NOT_FOUND;
}
Expand Down Expand Up @@ -111,6 +127,8 @@ public int hashCode() {
private static final ParseField ID_FIELD = new ParseField("_id");
private static final ParseField FOUND_FIELD = new ParseField("found");
private static final ParseField VERSION_FIELD = new ParseField("_version");
private static final ParseField SEQ_NO_FIELD = new ParseField("_seq_no");
private static final ParseField PRIMARY_TERM_FIELD = new ParseField("_primary_term");
private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField WATCH_FIELD = new ParseField("watch");

Expand All @@ -119,9 +137,10 @@ public int hashCode() {
a -> {
boolean isFound = (boolean) a[1];
if (isFound) {
XContentBuilder builder = (XContentBuilder) a[4];
XContentBuilder builder = (XContentBuilder) a[6];
BytesReference source = BytesReference.bytes(builder);
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType());
return new GetWatchResponse((String) a[0], (long) a[2], (long) a[3], (long) a[4], (WatchStatus) a[5],
source, builder.contentType());
} else {
return new GetWatchResponse((String) a[0]);
}
Expand All @@ -131,6 +150,8 @@ public int hashCode() {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), SEQ_NO_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), PRIMARY_TERM_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> WatchStatus.parse(parser), STATUS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.util.Objects;
import java.util.regex.Pattern;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

/**
* This request class contains the data needed to create a watch along with the name of the watch.
* The name of the watch will become the ID of the indexed document.
Expand All @@ -40,6 +44,9 @@ public final class PutWatchRequest implements Validatable {
private final XContentType xContentType;
private boolean active = true;
private long version = Versions.MATCH_ANY;
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;


public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
Objects.requireNonNull(id, "watch id is missing");
Expand Down Expand Up @@ -96,6 +103,56 @@ public void setVersion(long version) {
this.version = version;
}

/**
* only performs this put request if the watch's last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the watch's last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public PutWatchRequest setIfSeqNo(long seqNo) {
if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
}
ifSeqNo = seqNo;
return this;
}

/**
* only performs this put request if the watch's last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the watch last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public PutWatchRequest setIfPrimaryTerm(long term) {
if (term < 0) {
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
}
ifPrimaryTerm = term;
return this;
}

/**
* If set, only perform this put watch request if the watch's last modification was assigned this sequence number.
* If the watch last last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifSeqNo() {
return ifSeqNo;
}

/**
* If set, only perform this put watch request if the watch's last modification was assigned this primary term.
*
* If the watch's last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
public long ifPrimaryTerm() {
return ifPrimaryTerm;
}


public static boolean isValidId(String id) {
return Strings.isEmpty(id) == false && NO_WS_PATTERN.matcher(id).matches();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -32,20 +33,26 @@ public class PutWatchResponse {

static {
PARSER.declareString(PutWatchResponse::setId, new ParseField("_id"));
PARSER.declareLong(PutWatchResponse::setSeqNo, new ParseField("_seq_no"));
PARSER.declareLong(PutWatchResponse::setPrimaryTerm, new ParseField("_primary_term"));
PARSER.declareLong(PutWatchResponse::setVersion, new ParseField("_version"));
PARSER.declareBoolean(PutWatchResponse::setCreated, new ParseField("created"));
}

private String id;
private long version;
private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
private boolean created;

public PutWatchResponse() {
}

public PutWatchResponse(String id, long version, boolean created) {
public PutWatchResponse(String id, long version, long seqNo, long primaryTerm, boolean created) {
this.id = id;
this.version = version;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.created = created;
}

Expand All @@ -57,6 +64,14 @@ private void setVersion(long version) {
this.version = version;
}

private void setSeqNo(long seqNo) {
this.seqNo = seqNo;
}

private void setPrimaryTerm(long primaryTerm) {
this.primaryTerm = primaryTerm;
}

private void setCreated(boolean created) {
this.created = created;
}
Expand All @@ -69,6 +84,14 @@ public long getVersion() {
return version;
}

public long getSeqNo() {
return seqNo;
}

public long getPrimaryTerm() {
return primaryTerm;
}

public boolean isCreated() {
return created;
}
Expand All @@ -80,12 +103,14 @@ public boolean equals(Object o) {

PutWatchResponse that = (PutWatchResponse) o;

return Objects.equals(id, that.id) && Objects.equals(version, that.version) && Objects.equals(created, that.created);
return Objects.equals(id, that.id) && Objects.equals(version, that.version)
&& Objects.equals(seqNo, that.seqNo)
&& Objects.equals(primaryTerm, that.primaryTerm) && Objects.equals(created, that.created);
}

@Override
public int hashCode() {
return Objects.hash(id, version, created);
return Objects.hash(id, version, seqNo, primaryTerm, created);
}

public static PutWatchResponse fromXContent(XContentParser parser) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ private static XContentBuilder toXContent(PutWatchResponse response, XContentBui
return builder.startObject()
.field("_id", response.getId())
.field("_version", response.getVersion())
.field("_seq_no", response.getSeqNo())
.field("_primary_term", response.getPrimaryTerm())
.field("created", response.isCreated())
.endObject();
}

private static PutWatchResponse createTestInstance() {
String id = randomAlphaOfLength(10);
long seqNo = randomNonNegativeLong();
long primaryTerm = randomLongBetween(1, 200);
long version = randomLongBetween(1, 10);
boolean created = randomBoolean();
return new PutWatchResponse(id, version, created);
return new PutWatchResponse(id, version, seqNo, primaryTerm, created);
}
}
6 changes: 6 additions & 0 deletions x-pack/docs/en/rest-api/watcher/ack-watch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ The action state of a newly-created watch is `awaits_successful_execution`:
--------------------------------------------------
{
"found": true,
"_seq_no": 0,
"_primary_term": 1,
"_version": 1,
"_id": "my_watch",
"status": {
Expand Down Expand Up @@ -137,6 +139,8 @@ and the action is now in `ackable` state:
{
"found": true,
"_id": "my_watch",
"_seq_no": 1,
"_primary_term": 1,
"_version": 2,
"status": {
"version": 2,
Expand Down Expand Up @@ -186,6 +190,8 @@ GET _watcher/watch/my_watch
{
"found": true,
"_id": "my_watch",
"_seq_no": 2,
"_primary_term": 1,
"_version": 3,
"status": {
"version": 3,
Expand Down
2 changes: 2 additions & 0 deletions x-pack/docs/en/rest-api/watcher/activate-watch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ GET _watcher/watch/my_watch
{
"found": true,
"_id": "my_watch",
"_seq_no": 0,
"_primary_term": 1,
"_version": 1,
"status": {
"state" : {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/docs/en/rest-api/watcher/deactivate-watch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ GET _watcher/watch/my_watch
"found": true,
"_id": "my_watch",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"status": {
"state" : {
"active" : true,
Expand Down
2 changes: 2 additions & 0 deletions x-pack/docs/en/rest-api/watcher/get-watch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Response:
{
"found": true,
"_id": "my_watch",
"_seq_no": 0,
"_primary_term": 1,
"_version": 1,
"status": { <1>
"version": 1,
Expand Down
Loading