Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
package org.elasticsearch.upgrades;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.rest.action.document.RestBulkAction;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;

/**
* Basic test that indexed documents survive the rolling restart. See
Expand Down Expand Up @@ -120,6 +125,60 @@ public void testIndexing() throws IOException {
}
}

public void testAutoIdWithOpTypeCreate() throws IOException {
final String indexName = "auto_id_and_op_type_create_index";
StringBuilder b = new StringBuilder();
b.append("{\"create\": {\"_index\": \"").append(indexName).append("\"}}\n");
b.append("{\"f1\": \"v\"}\n");
Request bulk = new Request("POST", "/_bulk");
bulk.addParameter("refresh", "true");
bulk.setJsonEntity(b.toString());

switch (CLUSTER_TYPE) {
case OLD:
Request createTestIndex = new Request("PUT", "/" + indexName);
createTestIndex.setJsonEntity("{\"settings\": {\"index.number_of_replicas\": 0}}");
client().performRequest(createTestIndex);
break;
case MIXED:
Request waitForGreen = new Request("GET", "/_cluster/health");
waitForGreen.addParameter("wait_for_nodes", "3");
client().performRequest(waitForGreen);

Version minNodeVersion = null;
Map<?, ?> response = entityAsMap(client().performRequest(new Request("GET", "_nodes")));
Map<?, ?> nodes = (Map<?, ?>) response.get("nodes");
for (Map.Entry<?, ?> node : nodes.entrySet()) {
Map<?, ?> nodeInfo = (Map<?, ?>) node.getValue();
Version nodeVersion = Version.fromString(nodeInfo.get("version").toString());
if (minNodeVersion == null) {
minNodeVersion = nodeVersion;
} else if (nodeVersion.before(minNodeVersion)) {
minNodeVersion = nodeVersion;
}
}

if (minNodeVersion.before(Version.V_8_0_0)) {
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(bulk));
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(),
// if request goes to 8.0+ node
either(containsString("optype create not supported for indexing requests without explicit id until"))
// if request goes to 7.x node
.or(containsString("an id must be provided if version type or value are set")
));
} else {
client().performRequest(bulk);
}
break;
case UPGRADED:
client().performRequest(bulk);
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}
}

private void bulk(String index, String valueSuffix, int count) throws IOException {
StringBuilder b = new StringBuilder();
for (int i = 0; i < count; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,50 @@

- match: { count: 2 }

---
"Empty _id with op_type create":
- skip:
version: " - 7.99.99"
reason: "auto id + op type create only supported since 8.0"

- do:
bulk:
refresh: true
body:
- index:
_index: test
_id: ''
- f: 1
- index:
_index: test
_id: id
- f: 2
- index:
_index: test
- f: 3
- create:
_index: test
- f: 4
- index:
_index: test
op_type: create
- f: 5
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: illegal_argument_exception }
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
- match: { items.1.index.result: created }
- match: { items.2.index.result: created }
- match: { items.3.create.result: created }
- match: { items.4.create.result: created }

- do:
count:
index: test

- match: { count: 4 }


---
"empty action":

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,53 @@

- match: { count: 2 }

---
"Empty _id with op_type create":
- skip:
version: " - 7.99.99"
reason: "auto id + op type create only supported since 8.0"

- do:
bulk:
refresh: true
body:
- index:
_index: test
_type: type
_id: ''
- f: 1
- index:
_index: test
_type: type
_id: id
- f: 2
- index:
_index: test
_type: type
- f: 3
- create:
_index: test
_type: type
- f: 4
- index:
_index: test
op_type: create
- f: 5
- match: { errors: true }
- match: { items.0.index.status: 400 }
- match: { items.0.index.error.type: illegal_argument_exception }
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
- match: { items.1.index.result: created }
- match: { items.2.index.result: created }
- match: { items.3.create.result: created }
- match: { items.4.create.result: created }

- do:
count:
index: test

- match: { count: 4 }

---
"empty action":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,18 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk

boolean hasIndexRequestsWithPipelines = false;
final MetaData metaData = clusterService.state().getMetaData();
final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
}

if (actionRequest instanceof IndexRequest) {
((IndexRequest) actionRequest).checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
}
}

if (hasIndexRequestsWithPipelines) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ public ActionRequestValidationException validate() {
if (contentType == null) {
validationException = addValidationError("content type is missing", validationException);
}
assert opType == OpType.INDEX || opType == OpType.CREATE : "unexpected op-type: " + opType;
final long resolvedVersion = resolveVersionDefaults();
if (opType() == OpType.CREATE) {
if (opType == OpType.CREATE) {
if (versionType != VersionType.INTERNAL) {
validationException = addValidationError("create operations only support internal versioning. use index instead",
validationException);
Expand All @@ -215,8 +216,11 @@ public ActionRequestValidationException validate() {
}
}

if (opType() != OpType.INDEX && id == null) {
addValidationError("an id is required for a " + opType() + " operation", validationException);
if (id == null) {
if (versionType != VersionType.INTERNAL ||
(resolvedVersion != Versions.MATCH_DELETED && resolvedVersion != Versions.MATCH_ANY)) {
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
}
}

validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);
Expand All @@ -226,10 +230,6 @@ public ActionRequestValidationException validate() {
id.getBytes(StandardCharsets.UTF_8).length, validationException);
}

if (id == null && (versionType == VersionType.INTERNAL && resolvedVersion == Versions.MATCH_ANY) == false) {
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
}

if (pipeline != null && pipeline.isEmpty()) {
validationException = addValidationError("pipeline cannot be an empty string", validationException);
}
Expand Down Expand Up @@ -638,8 +638,16 @@ public void resolveRouting(MetaData metaData) {
routing(metaData.resolveWriteIndexRouting(routing, index));
}

public void checkAutoIdWithOpTypeCreateSupportedByVersion(Version version) {
if (id == null && opType == OpType.CREATE && version.before(Version.V_8_0_0)) {
throw new IllegalArgumentException("optype create not supported for indexing requests without explicit id until all nodes " +
"are on version 8.0.0 or higher");
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
checkAutoIdWithOpTypeCreateSupportedByVersion(out.getVersion());
super.writeTo(out);
// A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
// So we use the type accessor method here to make the type non-null (will default it to "_doc").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,8 @@ private boolean canOptimizeAddDocument(Index index) {
}

protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL)
assert (index.version() == Versions.MATCH_DELETED || index.version() == Versions.MATCH_ANY) &&
index.versionType() == VersionType.INTERNAL
: "version: " + index.version() + " type: " + index.versionType();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.delete.DeleteRequest;
Expand All @@ -28,6 +29,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
Expand Down Expand Up @@ -107,6 +109,9 @@ private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
ClusterState state = mock(ClusterState.class);
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(state);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(state.getNodes()).thenReturn(discoveryNodes);
when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(clusterService.localNode()).thenReturn(localNode);
when(localNode.isIngestNode()).thenReturn(randomBoolean());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public void setupAction() {
ImmutableOpenMap<String, DiscoveryNode> ingestNodes = ImmutableOpenMap.<String, DiscoveryNode>builder(2)
.fPut("node1", remoteNode1).fPut("node2", remoteNode2).build();
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
when(nodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
MetaData metaData = MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,13 @@ public void testBulk() throws Exception {
.add(client().prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")))
.add(client().prepareIndex().setIndex("test").setType("type1").setId("2").setSource(source("2", "test")).setCreate(true))
.add(client().prepareIndex().setIndex("test").setType("type1").setSource(source("3", "test")))
.add(client().prepareIndex().setIndex("test").setType("type1").setCreate(true).setSource(source("4", "test")))
.add(client().prepareDelete().setIndex("test").setType("type1").setId("1"))
.add(client().prepareIndex().setIndex("test").setType("type1").setSource("{ xxx }", XContentType.JSON)) // failure
.execute().actionGet();

assertThat(bulkResponse.hasFailures(), equalTo(true));
assertThat(bulkResponse.getItems().length, equalTo(5));
assertThat(bulkResponse.getItems().length, equalTo(6));

assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false));
assertThat(bulkResponse.getItems()[0].getOpType(), equalTo(OpType.INDEX));
Expand All @@ -215,15 +216,21 @@ public void testBulk() throws Exception {
String generatedId3 = bulkResponse.getItems()[2].getId();

assertThat(bulkResponse.getItems()[3].isFailed(), equalTo(false));
assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.DELETE));
assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.CREATE));
assertThat(bulkResponse.getItems()[3].getIndex(), equalTo(getConcreteIndexName()));
assertThat(bulkResponse.getItems()[3].getType(), equalTo("type1"));
assertThat(bulkResponse.getItems()[3].getId(), equalTo("1"));
String generatedId4 = bulkResponse.getItems()[3].getId();

assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(true));
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.INDEX));
assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(false));
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.DELETE));
assertThat(bulkResponse.getItems()[4].getIndex(), equalTo(getConcreteIndexName()));
assertThat(bulkResponse.getItems()[4].getType(), equalTo("type1"));
assertThat(bulkResponse.getItems()[4].getId(), equalTo("1"));

assertThat(bulkResponse.getItems()[5].isFailed(), equalTo(true));
assertThat(bulkResponse.getItems()[5].getOpType(), equalTo(OpType.INDEX));
assertThat(bulkResponse.getItems()[5].getIndex(), equalTo(getConcreteIndexName()));
assertThat(bulkResponse.getItems()[5].getType(), equalTo("type1"));

waitForRelocation(ClusterHealthStatus.GREEN);
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().actionGet();
Expand All @@ -243,6 +250,10 @@ public void testBulk() throws Exception {
getResult = client().get(getRequest("test").id(generatedId3)).actionGet();
assertThat("cycle #" + i, getResult.getSourceAsString(), equalTo(Strings.toString(source("3", "test"))));
assertThat(getResult.getIndex(), equalTo(getConcreteIndexName()));

getResult = client().get(getRequest("test").id(generatedId4)).actionGet();
assertThat("cycle #" + i, getResult.getSourceAsString(), equalTo(Strings.toString(source("4", "test"))));
assertThat(getResult.getIndex(), equalTo(getConcreteIndexName()));
}
}

Expand Down
Loading