Skip to content

Commit 7b2613d

Browse files
committed
Allow optype CREATE for append-only indexing operations (#47169)
Bulk requests currently do not allow adding "create" actions with auto-generated IDs. This commit allows using the optype CREATE for append-only indexing operations. This is mainly the user facing aspect of it.
1 parent a032f9b commit 7b2613d

File tree

13 files changed

+224
-35
lines changed

13 files changed

+224
-35
lines changed

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/IndexingIT.java

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,23 @@
1919
package org.elasticsearch.upgrades;
2020

2121
import org.apache.http.util.EntityUtils;
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.client.Request;
24+
import org.elasticsearch.client.Response;
25+
import org.elasticsearch.client.ResponseException;
2226
import org.elasticsearch.common.Booleans;
2327
import org.elasticsearch.common.Strings;
2428
import org.elasticsearch.common.xcontent.XContentBuilder;
2529
import org.elasticsearch.rest.action.document.RestBulkAction;
26-
import org.elasticsearch.Version;
27-
import org.elasticsearch.client.Request;
28-
import org.elasticsearch.client.Response;
2930

3031
import java.io.IOException;
3132
import java.nio.charset.StandardCharsets;
33+
import java.util.Map;
3234

3335
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
3436
import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
37+
import static org.hamcrest.Matchers.containsString;
38+
import static org.hamcrest.Matchers.either;
3539
import static org.hamcrest.Matchers.equalTo;
3640

3741
/**
@@ -145,6 +149,60 @@ public void testIndexing() throws IOException {
145149
}
146150
}
147151

152+
public void testAutoIdWithOpTypeCreate() throws IOException {
153+
final String indexName = "auto_id_and_op_type_create_index";
154+
StringBuilder b = new StringBuilder();
155+
b.append("{\"create\": {\"_index\": \"").append(indexName).append("\"}}\n");
156+
b.append("{\"f1\": \"v\"}\n");
157+
Request bulk = new Request("POST", "/_bulk");
158+
bulk.addParameter("refresh", "true");
159+
bulk.setJsonEntity(b.toString());
160+
161+
switch (CLUSTER_TYPE) {
162+
case OLD:
163+
Request createTestIndex = new Request("PUT", "/" + indexName);
164+
createTestIndex.setJsonEntity("{\"settings\": {\"index.number_of_replicas\": 0}}");
165+
client().performRequest(createTestIndex);
166+
break;
167+
case MIXED:
168+
Request waitForGreen = new Request("GET", "/_cluster/health");
169+
waitForGreen.addParameter("wait_for_nodes", "3");
170+
client().performRequest(waitForGreen);
171+
172+
Version minNodeVersion = null;
173+
Map<?, ?> response = entityAsMap(client().performRequest(new Request("GET", "_nodes")));
174+
Map<?, ?> nodes = (Map<?, ?>) response.get("nodes");
175+
for (Map.Entry<?, ?> node : nodes.entrySet()) {
176+
Map<?, ?> nodeInfo = (Map<?, ?>) node.getValue();
177+
Version nodeVersion = Version.fromString(nodeInfo.get("version").toString());
178+
if (minNodeVersion == null) {
179+
minNodeVersion = nodeVersion;
180+
} else if (nodeVersion.before(minNodeVersion)) {
181+
minNodeVersion = nodeVersion;
182+
}
183+
}
184+
185+
if (minNodeVersion.before(Version.V_7_5_0)) {
186+
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(bulk));
187+
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
188+
assertThat(e.getMessage(),
189+
// if request goes to 7.5+ node
190+
either(containsString("optype create not supported for indexing requests without explicit id until"))
191+
// if request goes to < 7.5 node
192+
.or(containsString("an id must be provided if version type or value are set")
193+
));
194+
} else {
195+
client().performRequest(bulk);
196+
}
197+
break;
198+
case UPGRADED:
199+
client().performRequest(bulk);
200+
break;
201+
default:
202+
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
203+
}
204+
}
205+
148206
private void bulk(String index, String valueSuffix, int count) throws IOException {
149207
StringBuilder b = new StringBuilder();
150208
for (int i = 0; i < count; i++) {

rest-api-spec/src/main/resources/rest-api-spec/test/bulk/10_basic.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,50 @@
6161

6262
- match: { count: 2 }
6363

64+
---
65+
"Empty _id with op_type create":
66+
- skip:
67+
version: " - 7.99.99"
68+
reason: "auto id + op type create only supported since 8.0"
69+
70+
- do:
71+
bulk:
72+
refresh: true
73+
body:
74+
- index:
75+
_index: test
76+
_id: ''
77+
- f: 1
78+
- index:
79+
_index: test
80+
_id: id
81+
- f: 2
82+
- index:
83+
_index: test
84+
- f: 3
85+
- create:
86+
_index: test
87+
- f: 4
88+
- index:
89+
_index: test
90+
op_type: create
91+
- f: 5
92+
- match: { errors: true }
93+
- match: { items.0.index.status: 400 }
94+
- match: { items.0.index.error.type: illegal_argument_exception }
95+
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
96+
- match: { items.1.index.result: created }
97+
- match: { items.2.index.result: created }
98+
- match: { items.3.create.result: created }
99+
- match: { items.4.create.result: created }
100+
101+
- do:
102+
count:
103+
index: test
104+
105+
- match: { count: 4 }
106+
107+
64108
---
65109
"empty action":
66110

rest-api-spec/src/main/resources/rest-api-spec/test/bulk/11_basic_with_types.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,53 @@
5656

5757
- match: { count: 2 }
5858

59+
---
60+
"Empty _id with op_type create":
61+
- skip:
62+
version: " - 7.99.99"
63+
reason: "auto id + op type create only supported since 8.0"
64+
65+
- do:
66+
bulk:
67+
refresh: true
68+
body:
69+
- index:
70+
_index: test
71+
_type: type
72+
_id: ''
73+
- f: 1
74+
- index:
75+
_index: test
76+
_type: type
77+
_id: id
78+
- f: 2
79+
- index:
80+
_index: test
81+
_type: type
82+
- f: 3
83+
- create:
84+
_index: test
85+
_type: type
86+
- f: 4
87+
- index:
88+
_index: test
89+
op_type: create
90+
- f: 5
91+
- match: { errors: true }
92+
- match: { items.0.index.status: 400 }
93+
- match: { items.0.index.error.type: illegal_argument_exception }
94+
- match: { items.0.index.error.reason: if _id is specified it must not be empty }
95+
- match: { items.1.index.result: created }
96+
- match: { items.2.index.result: created }
97+
- match: { items.3.create.result: created }
98+
- match: { items.4.create.result: created }
99+
100+
- do:
101+
count:
102+
index: test
103+
104+
- match: { count: 4 }
105+
59106
---
60107
"empty action":
61108
- skip:

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,18 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
162162

163163
boolean hasIndexRequestsWithPipelines = false;
164164
final MetaData metaData = clusterService.state().getMetaData();
165+
final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
165166
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
166167
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
167168
if (indexRequest != null) {
168169
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
169170
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
170171
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
171172
}
173+
174+
if (actionRequest instanceof IndexRequest) {
175+
((IndexRequest) actionRequest).checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
176+
}
172177
}
173178

174179
if (hasIndexRequestsWithPipelines) {

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,9 @@ public ActionRequestValidationException validate() {
206206
if (contentType == null) {
207207
validationException = addValidationError("content type is missing", validationException);
208208
}
209+
assert opType == OpType.INDEX || opType == OpType.CREATE : "unexpected op-type: " + opType;
209210
final long resolvedVersion = resolveVersionDefaults();
210-
if (opType() == OpType.CREATE) {
211+
if (opType == OpType.CREATE) {
211212
if (versionType != VersionType.INTERNAL) {
212213
validationException = addValidationError("create operations only support internal versioning. use index instead",
213214
validationException);
@@ -227,8 +228,11 @@ public ActionRequestValidationException validate() {
227228
}
228229
}
229230

230-
if (opType() != OpType.INDEX && id == null) {
231-
addValidationError("an id is required for a " + opType() + " operation", validationException);
231+
if (id == null) {
232+
if (versionType != VersionType.INTERNAL ||
233+
(resolvedVersion != Versions.MATCH_DELETED && resolvedVersion != Versions.MATCH_ANY)) {
234+
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
235+
}
232236
}
233237

234238
validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);
@@ -238,10 +242,6 @@ public ActionRequestValidationException validate() {
238242
id.getBytes(StandardCharsets.UTF_8).length, validationException);
239243
}
240244

241-
if (id == null && (versionType == VersionType.INTERNAL && resolvedVersion == Versions.MATCH_ANY) == false) {
242-
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
243-
}
244-
245245
if (pipeline != null && pipeline.isEmpty()) {
246246
validationException = addValidationError("pipeline cannot be an empty string", validationException);
247247
}
@@ -655,8 +655,16 @@ public void resolveRouting(MetaData metaData) {
655655
routing(metaData.resolveWriteIndexRouting(routing, index));
656656
}
657657

658+
public void checkAutoIdWithOpTypeCreateSupportedByVersion(Version version) {
659+
if (id == null && opType == OpType.CREATE && version.before(Version.V_7_5_0)) {
660+
throw new IllegalArgumentException("optype create not supported for indexing requests without explicit id until all nodes " +
661+
"are on version 7.5.0 or higher");
662+
}
663+
}
664+
658665
@Override
659666
public void writeTo(StreamOutput out) throws IOException {
667+
checkAutoIdWithOpTypeCreateSupportedByVersion(out.getVersion());
660668
super.writeTo(out);
661669
// A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
662670
// So we use the type accessor method here to make the type non-null (will default it to "_doc").

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,8 @@ private boolean canOptimizeAddDocument(Index index) {
803803
}
804804

805805
protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
806-
assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL)
806+
assert (index.version() == Versions.MATCH_DELETED || index.version() == Versions.MATCH_ANY) &&
807+
index.versionType() == VersionType.INTERNAL
807808
: "version: " + index.version() + " type: " + index.versionType();
808809
return true;
809810
}

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2425
import org.elasticsearch.action.delete.DeleteRequest;
@@ -28,6 +29,7 @@
2829
import org.elasticsearch.cluster.ClusterState;
2930
import org.elasticsearch.cluster.metadata.MetaData;
3031
import org.elasticsearch.cluster.node.DiscoveryNode;
32+
import org.elasticsearch.cluster.node.DiscoveryNodes;
3133
import org.elasticsearch.cluster.service.ClusterService;
3234
import org.elasticsearch.common.unit.TimeValue;
3335
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -107,6 +109,9 @@ private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
107109
ClusterState state = mock(ClusterState.class);
108110
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
109111
when(clusterService.state()).thenReturn(state);
112+
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
113+
when(state.getNodes()).thenReturn(discoveryNodes);
114+
when(discoveryNodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
110115
DiscoveryNode localNode = mock(DiscoveryNode.class);
111116
when(clusterService.localNode()).thenReturn(localNode);
112117
when(localNode.isIngestNode()).thenReturn(randomBoolean());

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ public void setupAction() {
189189
ImmutableOpenMap<String, DiscoveryNode> ingestNodes = ImmutableOpenMap.<String, DiscoveryNode>builder(2)
190190
.fPut("node1", remoteNode1).fPut("node2", remoteNode2).build();
191191
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
192+
when(nodes.getMinNodeVersion()).thenReturn(Version.CURRENT);
192193
ClusterState state = mock(ClusterState.class);
193194
when(state.getNodes()).thenReturn(nodes);
194195
MetaData metaData = MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()

server/src/test/java/org/elasticsearch/document/DocumentActionsIT.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,13 @@ public void testBulk() throws Exception {
189189
.add(client().prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")))
190190
.add(client().prepareIndex().setIndex("test").setType("type1").setId("2").setSource(source("2", "test")).setCreate(true))
191191
.add(client().prepareIndex().setIndex("test").setType("type1").setSource(source("3", "test")))
192+
.add(client().prepareIndex().setIndex("test").setType("type1").setCreate(true).setSource(source("4", "test")))
192193
.add(client().prepareDelete().setIndex("test").setType("type1").setId("1"))
193194
.add(client().prepareIndex().setIndex("test").setType("type1").setSource("{ xxx }", XContentType.JSON)) // failure
194195
.execute().actionGet();
195196

196197
assertThat(bulkResponse.hasFailures(), equalTo(true));
197-
assertThat(bulkResponse.getItems().length, equalTo(5));
198+
assertThat(bulkResponse.getItems().length, equalTo(6));
198199

199200
assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false));
200201
assertThat(bulkResponse.getItems()[0].getOpType(), equalTo(OpType.INDEX));
@@ -215,15 +216,21 @@ public void testBulk() throws Exception {
215216
String generatedId3 = bulkResponse.getItems()[2].getId();
216217

217218
assertThat(bulkResponse.getItems()[3].isFailed(), equalTo(false));
218-
assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.DELETE));
219+
assertThat(bulkResponse.getItems()[3].getOpType(), equalTo(OpType.CREATE));
219220
assertThat(bulkResponse.getItems()[3].getIndex(), equalTo(getConcreteIndexName()));
220221
assertThat(bulkResponse.getItems()[3].getType(), equalTo("type1"));
221-
assertThat(bulkResponse.getItems()[3].getId(), equalTo("1"));
222+
String generatedId4 = bulkResponse.getItems()[3].getId();
222223

223-
assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(true));
224-
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.INDEX));
224+
assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(false));
225+
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo(OpType.DELETE));
225226
assertThat(bulkResponse.getItems()[4].getIndex(), equalTo(getConcreteIndexName()));
226227
assertThat(bulkResponse.getItems()[4].getType(), equalTo("type1"));
228+
assertThat(bulkResponse.getItems()[4].getId(), equalTo("1"));
229+
230+
assertThat(bulkResponse.getItems()[5].isFailed(), equalTo(true));
231+
assertThat(bulkResponse.getItems()[5].getOpType(), equalTo(OpType.INDEX));
232+
assertThat(bulkResponse.getItems()[5].getIndex(), equalTo(getConcreteIndexName()));
233+
assertThat(bulkResponse.getItems()[5].getType(), equalTo("type1"));
227234

228235
waitForRelocation(ClusterHealthStatus.GREEN);
229236
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().actionGet();
@@ -243,6 +250,10 @@ public void testBulk() throws Exception {
243250
getResult = client().get(getRequest("test").type("type1").id(generatedId3)).actionGet();
244251
assertThat("cycle #" + i, getResult.getSourceAsString(), equalTo(Strings.toString(source("3", "test"))));
245252
assertThat(getResult.getIndex(), equalTo(getConcreteIndexName()));
253+
254+
getResult = client().get(getRequest("test").id(generatedId4)).actionGet();
255+
assertThat("cycle #" + i, getResult.getSourceAsString(), equalTo(Strings.toString(source("4", "test"))));
256+
assertThat(getResult.getIndex(), equalTo(getConcreteIndexName()));
246257
}
247258
}
248259

0 commit comments

Comments
 (0)