From 05732b905aed8add1442b10cb8b441660564d29d Mon Sep 17 00:00:00 2001 From: Yu Date: Sat, 22 Jun 2019 22:50:14 +0800 Subject: [PATCH 1/5] Fix versioning for Update Rest API The versioning of Update API doesn't rely on version number anymore (and rather on sequence number now). But in rest api level we ignored and didn't parse the "version" and "version_type" parameter, so that the server doesn't raise the exception even if we set them. This PR restores "version" and "version_type" parsing in Update Rest API so that we can get the appropriate errors. --- docs/reference/docs/update.asciidoc | 22 +-------- .../action/document/RestUpdateAction.java | 2 + .../document/RestUpdateActionTests.java | 49 ++++++++++++++++++- 3 files changed, 51 insertions(+), 22 deletions(-) diff --git a/docs/reference/docs/update.asciidoc b/docs/reference/docs/update.asciidoc index 00cd66232190f..af2bf8e9b0cca 100644 --- a/docs/reference/docs/update.asciidoc +++ b/docs/reference/docs/update.asciidoc @@ -5,8 +5,7 @@ The update API allows to update a document based on a script provided. The operation gets the document (collocated with the shard) from the index, runs the script (with optional script language and parameters), and indexes back the result (also allows to delete, or ignore the -operation). It uses versioning to make sure no updates have happened -during the "get" and "reindex". +operation). Note, this operation still means full reindex of the document, it just removes some network roundtrips and reduces chances of version conflicts @@ -333,25 +332,6 @@ Allows to control if and how the updated source should be returned in the respon By default the updated source is not returned. See <> for details. - -`version`:: - -The update API uses the Elasticsearch versioning support internally to make -sure the document doesn't change during the update. You can use the `version` -parameter to specify that the document should only be updated if its version -matches the one specified. - -[NOTE] -.The update API does not support versioning other than internal -===================================================== - -External (version types `external` and `external_gte`) or forced (version type `force`) -versioning is not supported by the update API as it would result in Elasticsearch -version numbers being out of sync with the external system. Use the -<> instead. - -===================================================== - `if_seq_no` and `if_primary_term`:: Update operations can be made conditional and only be performed if the last diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java index 804fa61fc53b2..463a18ea6b802 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java @@ -83,6 +83,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict())); + updateRequest.version(RestActions.parseVersion(request)); + updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType())); updateRequest.setIfSeqNo(request.paramAsLong("if_seq_no", updateRequest.ifSeqNo())); updateRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", updateRequest.ifPrimaryTerm())); diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java index cea3e9727e275..2731b75d82d9a 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java @@ -19,18 +19,29 @@ package org.elasticsearch.rest.action.document; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest.Method; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.test.rest.RestActionTestCase; import org.junit.Before; +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockito.Mockito.mock; + public class RestUpdateActionTests extends RestActionTestCase { + private RestUpdateAction action; + @Before public void setUpAction() { - new RestUpdateAction(Settings.EMPTY, controller()); + action = new RestUpdateAction(Settings.EMPTY, controller()); } public void testTypeInPath() { @@ -47,4 +58,40 @@ public void testTypeInPath() { .build(); dispatchRequest(validRequest); } + + public void testUpdateDocVersion() { + Map params = Collections.singletonMap("version", "100"); + String content = + "{\n" + + " \"doc\" : {\n" + + " \"name\" : \"new_name\"\n" + + " }\n" + + "}"; + FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withMethod(RestRequest.Method.POST) + .withPath("test/_update/1") + .withParams(params) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); + assertThat(e.getMessage(), equalTo("update requests do not support versioning")); + } + + public void testUpdateDocVersionType() { + Map params = Collections.singletonMap("version_type", "internal"); + String content = + "{\n" + + " \"doc\" : {\n" + + " \"name\" : \"new_name\"\n" + + " }\n" + + "}"; + FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withMethod(RestRequest.Method.POST) + .withPath("test/_update/1") + .withParams(params) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); + assertThat(e.getMessage(), equalTo("update requests do not support versioning")); + } } From 564227661a514e92cf45bc23d3e76b8daaf884bc Mon Sep 17 00:00:00 2001 From: Yu Date: Mon, 1 Jul 2019 20:46:23 +0800 Subject: [PATCH 2/5] Make version parsing conditional --- .../rest/action/document/RestUpdateAction.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java index 463a18ea6b802..394e2fbbcb3a7 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java @@ -83,8 +83,12 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict())); - updateRequest.version(RestActions.parseVersion(request)); - updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType())); + if (request.hasParam("version")) { + updateRequest.version(RestActions.parseVersion(request)); + } + if (request.hasParam("version_type")) { + updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType())); + } updateRequest.setIfSeqNo(request.paramAsLong("if_seq_no", updateRequest.ifSeqNo())); updateRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", updateRequest.ifPrimaryTerm())); From facd3fbbf4e841d39ee276d4504ff7be7b3213ae Mon Sep 17 00:00:00 2001 From: Yu Date: Sun, 14 Jul 2019 17:32:36 +0800 Subject: [PATCH 3/5] Change exception thrown for update version Change exception thrown from UnsupportedOperationException to ActionRequestValidationException --- .../rest/action/document/RestUpdateAction.java | 11 ++++++----- .../action/document/RestUpdateActionTests.java | 15 ++++++++++----- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java index 394e2fbbcb3a7..e89d257aefbca 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestUpdateAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.rest.action.document; import org.apache.logging.log4j.LogManager; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.update.UpdateRequest; @@ -83,11 +84,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict())); - if (request.hasParam("version")) { - updateRequest.version(RestActions.parseVersion(request)); - } - if (request.hasParam("version_type")) { - updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType())); + if (request.hasParam("version") || request.hasParam("version_type")) { + final ActionRequestValidationException versioningError = new ActionRequestValidationException(); + versioningError.addValidationError("internal versioning can not be used for optimistic concurrency control. " + + "Please use `if_seq_no` and `if_primary_term` instead"); + throw versioningError; } updateRequest.setIfSeqNo(request.paramAsLong("if_seq_no", updateRequest.ifSeqNo())); diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java index 2731b75d82d9a..621d9907c81a9 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.rest.action.document; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; @@ -32,7 +33,7 @@ import java.util.Collections; import java.util.Map; -import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.containsString; import static org.mockito.Mockito.mock; public class RestUpdateActionTests extends RestActionTestCase { @@ -73,8 +74,10 @@ public void testUpdateDocVersion() { .withParams(params) .withContent(new BytesArray(content), XContentType.JSON) .build(); - UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); - assertThat(e.getMessage(), equalTo("update requests do not support versioning")); + ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, + () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); + assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + + "Please use `if_seq_no` and `if_primary_term` instead")); } public void testUpdateDocVersionType() { @@ -91,7 +94,9 @@ public void testUpdateDocVersionType() { .withParams(params) .withContent(new BytesArray(content), XContentType.JSON) .build(); - UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); - assertThat(e.getMessage(), equalTo("update requests do not support versioning")); + ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, + () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); + assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + + "Please use `if_seq_no` and `if_primary_term` instead")); } } From 9df31358e2d82b6ce9f31b5aea5f0c2b86f3ee36 Mon Sep 17 00:00:00 2001 From: Yu Date: Mon, 15 Jul 2019 12:06:28 +0800 Subject: [PATCH 4/5] Update tests --- .../document/RestUpdateActionTests.java | 94 +++++++++++-------- 1 file changed, 57 insertions(+), 37 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java index 621d9907c81a9..2f0014f9a35a1 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java @@ -61,42 +61,62 @@ public void testTypeInPath() { } public void testUpdateDocVersion() { - Map params = Collections.singletonMap("version", "100"); - String content = - "{\n" + - " \"doc\" : {\n" + - " \"name\" : \"new_name\"\n" + - " }\n" + - "}"; - FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) - .withMethod(RestRequest.Method.POST) - .withPath("test/_update/1") - .withParams(params) - .withContent(new BytesArray(content), XContentType.JSON) - .build(); - ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, - () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); - assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + - "Please use `if_seq_no` and `if_primary_term` instead")); - } - - public void testUpdateDocVersionType() { - Map params = Collections.singletonMap("version_type", "internal"); - String content = - "{\n" + - " \"doc\" : {\n" + - " \"name\" : \"new_name\"\n" + - " }\n" + - "}"; - FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) - .withMethod(RestRequest.Method.POST) - .withPath("test/_update/1") - .withParams(params) - .withContent(new BytesArray(content), XContentType.JSON) - .build(); - ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, - () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); - assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + - "Please use `if_seq_no` and `if_primary_term` instead")); + { + Map params = Collections.singletonMap("version", "100"); + String content = + "{\n" + + " \"doc\" : {\n" + + " \"name\" : \"new_name\"\n" + + " }\n" + + "}"; + FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withMethod(RestRequest.Method.POST) + .withPath("test/_update/1") + .withParams(params) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, + () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); + assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + + "Please use `if_seq_no` and `if_primary_term` instead")); + } + { + Map params = Collections.singletonMap("version_type", "internal"); + String content = + "{\n" + + " \"doc\" : {\n" + + " \"name\" : \"new_name\"\n" + + " }\n" + + "}"; + FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withMethod(RestRequest.Method.POST) + .withPath("test/_update/1") + .withParams(params) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, + () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); + assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + + "Please use `if_seq_no` and `if_primary_term` instead")); + } + { + Map params = Map.of("version", "100", "version_type", "external"); + String content = + "{\n" + + " \"doc\" : {\n" + + " \"name\" : \"new_name\"\n" + + " }\n" + + "}"; + FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withMethod(RestRequest.Method.POST) + .withPath("test/_update/1") + .withParams(params) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, + () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); + assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + + "Please use `if_seq_no` and `if_primary_term` instead")); + } } } From e64daa934feb94096816b18049f526428e085add Mon Sep 17 00:00:00 2001 From: Yu Date: Tue, 16 Jul 2019 21:45:25 +0800 Subject: [PATCH 5/5] update tests --- .../document/RestUpdateActionTests.java | 83 ++++++------------- 1 file changed, 26 insertions(+), 57 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java index 2f0014f9a35a1..119057a66d93c 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestUpdateActionTests.java @@ -24,13 +24,14 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest.Method; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.test.rest.RestActionTestCase; import org.junit.Before; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.hamcrest.CoreMatchers.containsString; @@ -61,62 +62,30 @@ public void testTypeInPath() { } public void testUpdateDocVersion() { - { - Map params = Collections.singletonMap("version", "100"); - String content = - "{\n" + - " \"doc\" : {\n" + - " \"name\" : \"new_name\"\n" + - " }\n" + - "}"; - FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) - .withMethod(RestRequest.Method.POST) - .withPath("test/_update/1") - .withParams(params) - .withContent(new BytesArray(content), XContentType.JSON) - .build(); - ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, - () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); - assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + - "Please use `if_seq_no` and `if_primary_term` instead")); - } - { - Map params = Collections.singletonMap("version_type", "internal"); - String content = - "{\n" + - " \"doc\" : {\n" + - " \"name\" : \"new_name\"\n" + - " }\n" + - "}"; - FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) - .withMethod(RestRequest.Method.POST) - .withPath("test/_update/1") - .withParams(params) - .withContent(new BytesArray(content), XContentType.JSON) - .build(); - ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, - () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); - assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + - "Please use `if_seq_no` and `if_primary_term` instead")); - } - { - Map params = Map.of("version", "100", "version_type", "external"); - String content = - "{\n" + - " \"doc\" : {\n" + - " \"name\" : \"new_name\"\n" + - " }\n" + - "}"; - FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) - .withMethod(RestRequest.Method.POST) - .withPath("test/_update/1") - .withParams(params) - .withContent(new BytesArray(content), XContentType.JSON) - .build(); - ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, - () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); - assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + - "Please use `if_seq_no` and `if_primary_term` instead")); + Map params = new HashMap<>(); + if (randomBoolean()) { + params.put("version", Long.toString(randomNonNegativeLong())); + params.put("version_type", randomFrom(VersionType.values()).name()); + } else if (randomBoolean()) { + params.put("version", Long.toString(randomNonNegativeLong())); + } else { + params.put("version_type", randomFrom(VersionType.values()).name()); } + String content = + "{\n" + + " \"doc\" : {\n" + + " \"name\" : \"new_name\"\n" + + " }\n" + + "}"; + FakeRestRequest updateRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withMethod(RestRequest.Method.POST) + .withPath("test/_update/1") + .withParams(params) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class, + () -> action.prepareRequest(updateRequest, mock(NodeClient.class))); + assertThat(e.getMessage(), containsString("internal versioning can not be used for optimistic concurrency control. " + + "Please use `if_seq_no` and `if_primary_term` instead")); } }