Skip to content

Commit c048c86

Browse files
authored
Allow optype CREATE for append-only indexing operations (#47169)
Bulk requests currently do not allow adding "create" actions with auto-generated IDs. This commit allows using the optype CREATE for append-only indexing operations. This is mainly the user facing aspect of it.
1 parent 4c90ec6 commit c048c86

File tree

13 files changed

+222
-32
lines changed

13 files changed

+222
-32
lines changed

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/IndexingIT.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,20 @@
1919
package org.elasticsearch.upgrades;
2020

2121
import org.apache.http.util.EntityUtils;
22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.client.Request;
2324
import org.elasticsearch.client.Response;
25+
import org.elasticsearch.client.ResponseException;
2426
import org.elasticsearch.common.Booleans;
2527
import org.elasticsearch.rest.action.document.RestBulkAction;
2628

2729
import java.io.IOException;
2830
import java.nio.charset.StandardCharsets;
31+
import java.util.Map;
2932

3033
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
34+
import static org.hamcrest.Matchers.containsString;
35+
import static org.hamcrest.Matchers.either;
3136

3237
/**
3338
* Basic test that indexed documents survive the rolling restart. See
@@ -120,6 +125,60 @@ public void testIndexing() throws IOException {
120125
}
121126
}
122127

128+
public void testAutoIdWithOpTypeCreate() throws IOException {
129+
final String indexName = "auto_id_and_op_type_create_index";
130+
StringBuilder b = new StringBuilder();
131+
b.append("{\"create\": {\"_index\": \"").append(indexName).append("\"}}\n");
132+
b.append("{\"f1\": \"v\"}\n");
133+
Request bulk = new Request("POST", "/_bulk");
134+
bulk.addParameter("refresh", "true");
135+
bulk.setJsonEntity(b.toString());
136+
137+
switch (CLUSTER_TYPE) {
138+
case OLD:
139+
Request createTestIndex = new Request("PUT", "/" + indexName);
140+
createTestIndex.setJsonEntity("{\"settings\": {\"index.number_of_replicas\": 0}}");
141+
client().performRequest(createTestIndex);
142+
break;
143+
case MIXED:
144+
Request waitForGreen = new Request("GET", "/_cluster/health");
145+
waitForGreen.addParameter("wait_for_nodes", "3");
146+
client().performRequest(waitForGreen);
147+
148+
Version minNodeVersion = null;
149+
Map<?, ?> response = entityAsMap(client().performRequest(new Request("GET", "_nodes")));
150+
Map<?, ?> nodes = (Map<?, ?>) response.get("nodes");
151+
for (Map.Entry<?, ?> node : nodes.entrySet()) {
152+
Map<?, ?> nodeInfo = (Map<?, ?>) node.getValue();
153+
Version nodeVersion = Version.fromString(nodeInfo.get("version").toString());
154+
if (minNodeVersion == null) {
155+
minNodeVersion = nodeVersion;
156+
} else if (nodeVersion.before(minNodeVersion)) {
157+
minNodeVersion = nodeVersion;
158+
}
159+
}
160+
161+
if (minNodeVersion.before(Version.V_8_0_0)) {
162+
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(bulk));
163+
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
164+
assertThat(e.getMessage(),
165+
// if request goes to 8.0+ node
166+
either(containsString("optype create not supported for indexing requests without explicit id until"))
167+
// if request goes to 7.x node
168+
.or(containsString("an id must be provided if version type or value are set")
169+
));
170+
} else {
171+
client().performRequest(bulk);
172+
}
173+
break;
174+
case UPGRADED:
175+
client().performRequest(bulk);
176+
break;
177+
default:
178+
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
179+
}
180+
}
181+
123182
private void bulk(String index, String valueSuffix, int count) throws IOException {
124183
StringBuilder b = new StringBuilder();
125184
for (int i = 0; i < count; i++) {

rest-api-spec/src/main/resources/rest-api-spec/test/bulk/10_basic.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,50 @@
5757

5858
- match: { count: 2 }
5959

60+
---
61+
"Empty _id with op_type create":
62+
- skip:
63+
version: " - 7.99.99"
64+
reason: "auto id + op type create only supported since 8.0"
65+
66+
- do:
67+
bulk:
68+
refresh: true
69+
body:
70+
- index:
71+
_index: test
72+
_id: ''
73+
- f: 1
74+
- index:
75+
_index: test
76+
_id: id
77+
- f: 2
78+
- index:
79+
_index: test
80+
- f: 3
81+
- create:
82+
_index: test
83+
- f: 4
84+
- index:
85+
_index: test
86+
op_type: create
87+
- f: 5
88+
- match: { errors: true }
89+
- match: { items.0.index.status: 400 }
90+
- match: { items.0.index.error.type: illegal_argument_exception }
91+
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
92+
- match: { items.1.index.result: created }
93+
- match: { items.2.index.result: created }
94+
- match: { items.3.create.result: created }
95+
- match: { items.4.create.result: created }
96+
97+
- do:
98+
count:
99+
index: test
100+
101+
- match: { count: 4 }
102+
103+
60104
---
61105
"empty action":
62106

rest-api-spec/src/main/resources/rest-api-spec/test/bulk/11_basic_with_types.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,53 @@
5656

5757
- match: { count: 2 }
5858

59+
---
60+
"Empty _id with op_type create":
61+
- skip:
62+
version: " - 7.99.99"
63+
reason: "auto id + op type create only supported since 8.0"
64+
65+
- do:
66+
bulk:
67+
refresh: true
68+
body:
69+
- index:
70+
_index: test
71+
_type: type
72+
_id: ''
73+
- f: 1
74+
- index:
75+
_index: test
76+
_type: type
77+
_id: id
78+
- f: 2
79+
- index:
80+
_index: test
81+
_type: type
82+
- f: 3
83+
- create:
84+
_index: test
85+
_type: type
86+
- f: 4
87+
- index:
88+
_index: test
89+
op_type: create
90+
- f: 5
91+
- match: { errors: true }
92+
- match: { items.0.index.status: 400 }
93+
- match: { items.0.index.error.type: illegal_argument_exception }
94+
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
95+
- match: { items.1.index.result: created }
96+
- match: { items.2.index.result: created }
97+
- match: { items.3.create.result: created }
98+
- match: { items.4.create.result: created }
99+
100+
- do:
101+
count:
102+
index: test
103+
104+
- match: { count: 4 }
105+
59106
---
60107
"empty action":
61108
- skip:

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,18 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
158158

159159
boolean hasIndexRequestsWithPipelines = false;
160160
final MetaData metaData = clusterService.state().getMetaData();
161+
final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
161162
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
162163
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
163164
if (indexRequest != null) {
164165
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
165166
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
166167
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
167168
}
169+
170+
if (actionRequest instanceof IndexRequest) {
171+
((IndexRequest) actionRequest).checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
172+
}
168173
}
169174

170175
if (hasIndexRequestsWithPipelines) {

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,9 @@ public ActionRequestValidationException validate() {
194194
if (contentType == null) {
195195
validationException = addValidationError("content type is missing", validationException);
196196
}
197+
assert opType == OpType.INDEX || opType == OpType.CREATE : "unexpected op-type: " + opType;
197198
final long resolvedVersion = resolveVersionDefaults();
198-
if (opType() == OpType.CREATE) {
199+
if (opType == OpType.CREATE) {
199200
if (versionType != VersionType.INTERNAL) {
200201
validationException = addValidationError("create operations only support internal versioning. use index instead",
201202
validationException);
@@ -215,8 +216,11 @@ public ActionRequestValidationException validate() {
215216
}
216217
}
217218

218-
if (opType() != OpType.INDEX && id == null) {
219-
addValidationError("an id is required for a " + opType() + " operation", validationException);
219+
if (id == null) {
220+
if (versionType != VersionType.INTERNAL ||
221+
(resolvedVersion != Versions.MATCH_DELETED && resolvedVersion != Versions.MATCH_ANY)) {
222+
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
223+
}
220224
}
221225

222226
validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);
@@ -226,10 +230,6 @@ public ActionRequestValidationException validate() {
226230
id.getBytes(StandardCharsets.UTF_8).length, validationException);
227231
}
228232

229-
if (id == null && (versionType == VersionType.INTERNAL && resolvedVersion == Versions.MATCH_ANY) == false) {
230-
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
231-
}
232-
233233
if (pipeline != null && pipeline.isEmpty()) {
234234
validationException = addValidationError("pipeline cannot be an empty string", validationException);
235235
}
@@ -638,8 +638,16 @@ public void resolveRouting(MetaData metaData) {
638638
routing(metaData.resolveWriteIndexRouting(routing, index));
639639
}
640640

641+
public void checkAutoIdWithOpTypeCreateSupportedByVersion(Version version) {
642+
if (id == null && opType == OpType.CREATE && version.before(Version.V_8_0_0)) {
643+
throw new IllegalArgumentException("optype create not supported for indexing requests without explicit id until all nodes " +
644+
"are on version 8.0.0 or higher");
645+
}
646+
}
647+
641648
@Override
642649
public void writeTo(StreamOutput out) throws IOException {
650+
checkAutoIdWithOpTypeCreateSupportedByVersion(out.getVersion());
643651
super.writeTo(out);
644652
// A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
645653
// So we use the type accessor method here to make the type non-null (will default it to "_doc").

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,8 @@ private boolean canOptimizeAddDocument(Index index) {
797797
}
798798

799799
protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
800-
assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL)
800+
assert (index.version() == Versions.MATCH_DELETED || index.version() == Versions.MATCH_ANY) &&
801+
index.versionType() == VersionType.INTERNAL
801802
: "version: " + index.version() + " type: " + index.versionType();
802803
return true;
803804
}

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2425
import org.elasticsearch.action.delete.DeleteRequest;
@@ -28,6 +29,7 @@
2829
import org.elasticsearch.cluster.ClusterState;
2930
import org.elasticsearch.cluster.metadata.MetaData;
3031
import org.elasticsearch.cluster.node.DiscoveryNode;
32+
import org.elasticsearch.cluster.node.DiscoveryNodes;
3133
import org.elasticsearch.cluster.service.ClusterService;
3234
import org.elasticsearch.common.unit.TimeValue;
3335
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -107,6 +109,9 @@ private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
107109
ClusterState state = mock(ClusterState.class);
108110
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
109111
when(clusterService.state()).thenReturn(state);
112+
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
113+
when(state.getNodes()).thenReturn(discoveryNodes);
114+
when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
110115
DiscoveryNode localNode = mock(DiscoveryNode.class);
111116
when(clusterService.localNode()).thenReturn(localNode);
112117
when(localNode.isIngestNode()).thenReturn(randomBoolean());

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ public void setupAction() {
190190
ImmutableOpenMap<String, DiscoveryNode> ingestNodes = ImmutableOpenMap.<String, DiscoveryNode>builder(2)
191191
.fPut("node1", remoteNode1).fPut("node2", remoteNode2).build();
192192
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
193+
when(nodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
193194
ClusterState state = mock(ClusterState.class);
194195
when(state.getNodes()).thenReturn(nodes);
195196
MetaData metaData = MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()

server/src/test/java/org/elasticsearch/document/DocumentActionsIT.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,13 @@ public void testBulk() throws Exception {
189189
.add(client().prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")))
190190
.add(client().prepareIndex().setIndex("test").setType("type1").setId("2").setSource(source("2", "test")).setCreate(true))
191191
.add(client().prepareIndex().setIndex("test").setType("type1").setSource(source("3", "test")))
192+
.add(client().prepareIndex().setIndex("test").setType("type1").setCreate(true).setSource(source("4", "test")))
192193
.add(client().prepareDelete().setIndex("test").setType("type1").setId("1"))
193194
.add(client().prepareIndex().setIndex("test").setType("type1").setSource("{ xxx }", XContentType.JSON)) // failure
194195
.execute().actionGet();
195196

196197
assertThat(bulkResponse.hasFailures(), equalTo(true));
197-
assertThat(bulkResponse.getItems().length, equalTo(5));
198+
assertThat(bulkResponse.getItems().length, equalTo(6));
198199

199200
assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false));
200201
assertThat(bulkResponse.getItems()[0].getOpType(), equalTo(OpType.INDEX));
@@ -215,15 +216,21 @@ public void testBulk() throws Exception {
215216
String generatedId3 = bulkResponse.getItems()[2].getId();
216217

217218
assertThat(bulkResponse.getItems()[3].isFailed(), equalTo(false));
218-
assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.DELETE));
219+
assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.CREATE));
219220
assertThat(bulkResponse.getItems()[3].getIndex(), equalTo(getConcreteIndexName()));
220221
assertThat(bulkResponse.getItems()[3].getType(), equalTo("type1"));
221-
assertThat(bulkResponse.getItems()[3].getId(), equalTo("1"));
222+
String generatedId4 = bulkResponse.getItems()[3].getId();
222223

223-
assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(true));
224-
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.INDEX));
224+
assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(false));
225+
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.DELETE));
225226
assertThat(bulkResponse.getItems()[4].getIndex(), equalTo(getConcreteIndexName()));
226227
assertThat(bulkResponse.getItems()[4].getType(), equalTo("type1"));
228+
assertThat(bulkResponse.getItems()[4].getId(), equalTo("1"));
229+
230+
assertThat(bulkResponse.getItems()[5].isFailed(), equalTo(true));
231+
assertThat(bulkResponse.getItems()[5].getOpType(), equalTo(OpType.INDEX));
232+
assertThat(bulkResponse.getItems()[5].getIndex(), equalTo(getConcreteIndexName()));
233+
assertThat(bulkResponse.getItems()[5].getType(), equalTo("type1"));
227234

228235
waitForRelocation(ClusterHealthStatus.GREEN);
229236
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().actionGet();
@@ -243,6 +250,10 @@ public void testBulk() throws Exception {
243250
getResult = client().get(getRequest("test").id(generatedId3)).actionGet();
244251
assertThat("cycle #" + i, getResult.getSourceAsString(), equalTo(Strings.toString(source("3", "test"))));
245252
assertThat(getResult.getIndex(), equalTo(getConcreteIndexName()));
253+
254+
getResult = client().get(getRequest("test").id(generatedId4)).actionGet();
255+
assertThat("cycle #" + i, getResult.getSourceAsString(), equalTo(Strings.toString(source("4", "test"))));
256+
assertThat(getResult.getIndex(), equalTo(getConcreteIndexName()));
246257
}
247258
}
248259

0 commit comments

Comments
 (0)