Skip to content

Commit 65a9b61

Browse files
authored
Add Seq# based optimistic concurrency control to UpdateRequest (#37872)
The update request has a lesser known support for a one off update of a known document version. This PR adds an a seq# based alternative to power these operations. Relates #36148 Relates #10708
1 parent 5d1964b commit 65a9b61

File tree

20 files changed

+450
-77
lines changed

20 files changed

+450
-77
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@
6868
import org.elasticsearch.rest.action.document.RestBulkAction;
6969
import org.elasticsearch.rest.action.document.RestDeleteAction;
7070
import org.elasticsearch.rest.action.document.RestGetAction;
71+
import org.elasticsearch.rest.action.document.RestIndexAction;
7172
import org.elasticsearch.rest.action.document.RestMultiGetAction;
7273
import org.elasticsearch.rest.action.document.RestUpdateAction;
73-
import org.elasticsearch.rest.action.document.RestIndexAction;
7474
import org.elasticsearch.script.Script;
7575
import org.elasticsearch.script.ScriptType;
7676
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
@@ -90,8 +90,10 @@
9090
import java.util.concurrent.atomic.AtomicReference;
9191

9292
import static java.util.Collections.singletonMap;
93+
import static org.hamcrest.Matchers.containsString;
9394
import static org.hamcrest.Matchers.empty;
9495
import static org.hamcrest.Matchers.equalTo;
96+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
9597
import static org.hamcrest.Matchers.hasSize;
9698
import static org.hamcrest.Matchers.instanceOf;
9799
import static org.hamcrest.Matchers.lessThan;
@@ -606,22 +608,46 @@ public void testUpdate() throws IOException {
606608
IndexResponse indexResponse = highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
607609
assertEquals(RestStatus.CREATED, indexResponse.status());
608610

609-
UpdateRequest updateRequest = new UpdateRequest("index", "id");
610-
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));
611611

612-
UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
613-
assertEquals(RestStatus.OK, updateResponse.status());
614-
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());
615-
616-
UpdateRequest updateRequestConflict = new UpdateRequest("index", "id");
617-
updateRequestConflict.doc(singletonMap("field", "with_version_conflict"), randomFrom(XContentType.values()));
618-
updateRequestConflict.version(indexResponse.getVersion());
612+
long lastUpdateSeqNo;
613+
long lastUpdatePrimaryTerm;
614+
{
615+
UpdateRequest updateRequest = new UpdateRequest("index", "id");
616+
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));
617+
final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
618+
assertEquals(RestStatus.OK, updateResponse.status());
619+
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());
620+
lastUpdateSeqNo = updateResponse.getSeqNo();
621+
lastUpdatePrimaryTerm = updateResponse.getPrimaryTerm();
622+
assertThat(lastUpdateSeqNo, greaterThanOrEqualTo(0L));
623+
assertThat(lastUpdatePrimaryTerm, greaterThanOrEqualTo(1L));
624+
}
619625

620-
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
621-
execute(updateRequestConflict, highLevelClient()::update, highLevelClient()::updateAsync));
622-
assertEquals(RestStatus.CONFLICT, exception.status());
623-
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: version conflict, " +
624-
"current version [2] is different than the one provided [1]]", exception.getMessage());
626+
{
627+
final UpdateRequest updateRequest = new UpdateRequest("index", "id");
628+
updateRequest.doc(singletonMap("field", "with_seq_no_conflict"), randomFrom(XContentType.values()));
629+
if (randomBoolean()) {
630+
updateRequest.setIfSeqNo(lastUpdateSeqNo + 1);
631+
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm);
632+
} else {
633+
updateRequest.setIfSeqNo(lastUpdateSeqNo + (randomBoolean() ? 0 : 1));
634+
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm + 1);
635+
}
636+
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
637+
execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync));
638+
assertEquals(exception.toString(),RestStatus.CONFLICT, exception.status());
639+
assertThat(exception.getMessage(), containsString("Elasticsearch exception [type=version_conflict_engine_exception"));
640+
}
641+
{
642+
final UpdateRequest updateRequest = new UpdateRequest("index", "id");
643+
updateRequest.doc(singletonMap("field", "with_seq_no"), randomFrom(XContentType.values()));
644+
updateRequest.setIfSeqNo(lastUpdateSeqNo);
645+
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm);
646+
final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
647+
assertEquals(RestStatus.OK, updateResponse.status());
648+
assertEquals(lastUpdateSeqNo + 1, updateResponse.getSeqNo());
649+
assertEquals(lastUpdatePrimaryTerm, updateResponse.getPrimaryTerm());
650+
}
625651
}
626652
{
627653
IndexRequest indexRequest = new IndexRequest("index").id("with_script");

docs/reference/docs/delete.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ The result of the above delete operation is:
3939
[[optimistic-concurrency-control-delete]]
4040
=== Optimistic concurrency control
4141

42-
Delete operations can be made optional and only be performed if the last
42+
Delete operations can be made conditional and only be performed if the last
4343
modification to the document was assigned the sequence number and primary
4444
term specified by the `if_seq_no` and `if_primary_term` parameters. If a
4545
mismatch is detected, the operation will result in a `VersionConflictException`

docs/reference/docs/index_.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ The result of the above index operation is:
185185
[[optimistic-concurrency-control-index]]
186186
=== Optimistic concurrency control
187187

188-
Index operations can be made optional and only be performed if the last
188+
Index operations can be made conditional and only be performed if the last
189189
modification to the document was assigned the sequence number and primary
190190
term specified by the `if_seq_no` and `if_primary_term` parameters. If a
191191
mismatch is detected, the operation will result in a `VersionConflictException`

docs/reference/docs/update.asciidoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,3 +349,11 @@ version numbers being out of sync with the external system. Use the
349349
<<docs-index_,`index` API>> instead.
350350
351351
=====================================================
352+
353+
`if_seq_no` and `if_primary_term`::
354+
355+
Update operations can be made conditional and only be performed if the last
356+
modification to the document was assigned the sequence number and primary
357+
term specified by the `if_seq_no` and `if_primary_term` parameters. If a
358+
mismatch is detected, the operation will result in a `VersionConflictException`
359+
and a status code of 409. See <<optimistic-concurrency-control>> for more details.

rest-api-spec/src/main/resources/rest-api-spec/api/update.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@
6363
"type": "time",
6464
"description": "Explicit operation timeout"
6565
},
66+
"if_seq_no" : {
67+
"type" : "number",
68+
"description" : "only perform the update operation if the last operation that has changed the document has the specified sequence number"
69+
},
70+
"if_primary_term" : {
71+
"type" : "number",
72+
"description" : "only perform the update operation if the last operation that has changed the document has the specified primary term"
73+
},
6674
"version": {
6775
"type": "number",
6876
"description": "Explicit version number for concurrency control"
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
---
2+
"Update with if_seq_no":
3+
4+
- skip:
5+
version: " - 6.99.99"
6+
reason: if_seq_no was added in 7.0
7+
8+
- do:
9+
catch: missing
10+
update:
11+
index: test_1
12+
id: 1
13+
if_seq_no: 1
14+
if_primary_term: 1
15+
body:
16+
doc: { foo: baz }
17+
18+
- do:
19+
index:
20+
index: test_1
21+
id: 1
22+
body:
23+
foo: baz
24+
25+
- do:
26+
catch: conflict
27+
update:
28+
index: test_1
29+
id: 1
30+
if_seq_no: 234
31+
if_primary_term: 1
32+
body:
33+
doc: { foo: baz }
34+
35+
- do:
36+
update:
37+
index: test_1
38+
id: 1
39+
if_seq_no: 0
40+
if_primary_term: 1
41+
body:
42+
doc: { foo: bar }
43+
44+
- do:
45+
get:
46+
index: test_1
47+
id: 1
48+
49+
- match: { _source: { foo: bar } }
50+
51+
- do:
52+
bulk:
53+
body:
54+
- update:
55+
_index: test_1
56+
_id: 1
57+
if_seq_no: 100
58+
if_primary_term: 200
59+
- doc:
60+
foo: baz
61+
62+
- match: { errors: true }
63+
- match: { items.0.update.status: 409 }
64+

server/src/main/java/org/elasticsearch/action/DocWriteRequest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,16 @@
2424
import org.elasticsearch.action.update.UpdateRequest;
2525
import org.elasticsearch.common.io.stream.StreamInput;
2626
import org.elasticsearch.common.io.stream.StreamOutput;
27+
import org.elasticsearch.common.lucene.uid.Versions;
2728
import org.elasticsearch.index.VersionType;
2829

2930
import java.io.IOException;
3031
import java.util.Locale;
3132

33+
import static org.elasticsearch.action.ValidateActions.addValidationError;
34+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
35+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
36+
3237
/**
3338
* Generic interface to group ActionRequest, which perform writes to a single document
3439
* Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest}
@@ -117,6 +122,39 @@ public interface DocWriteRequest<T> extends IndicesRequest {
117122
*/
118123
T versionType(VersionType versionType);
119124

125+
/**
126+
* only perform this request if the document was last modification was assigned the given
127+
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
128+
*
129+
* If the document last modification was assigned a different sequence number a
130+
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
131+
*/
132+
T setIfSeqNo(long seqNo);
133+
134+
/**
135+
* only performs this request if the document was last modification was assigned the given
136+
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
137+
*
138+
* If the document last modification was assigned a different term a
139+
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
140+
*/
141+
T setIfPrimaryTerm(long term);
142+
143+
/**
144+
* If set, only perform this request if the document was last modification was assigned this sequence number.
145+
* If the document last modification was assigned a different sequence number a
146+
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
147+
*/
148+
long ifSeqNo();
149+
150+
/**
151+
* If set, only perform this request if the document was last modification was assigned this primary term.
152+
*
153+
* If the document last modification was assigned a different term a
154+
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
155+
*/
156+
long ifPrimaryTerm();
157+
120158
/**
121159
* Get the requested document operation type of the request
122160
* @return the operation type {@link OpType}
@@ -216,4 +254,30 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest<?> request)
216254
throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]");
217255
}
218256
}
257+
258+
static ActionRequestValidationException validateSeqNoBasedCASParams(
259+
DocWriteRequest request, ActionRequestValidationException validationException) {
260+
if (request.versionType().validateVersionForWrites(request.version()) == false) {
261+
validationException = addValidationError("illegal version value [" + request.version() + "] for version type ["
262+
+ request.versionType().name() + "]", validationException);
263+
}
264+
if (request.versionType() == VersionType.FORCE) {
265+
validationException = addValidationError("version type [force] may no longer be used", validationException);
266+
}
267+
268+
if (request.ifSeqNo() != UNASSIGNED_SEQ_NO && (
269+
request.versionType() != VersionType.INTERNAL || request.version() != Versions.MATCH_ANY
270+
)) {
271+
validationException = addValidationError("compare and write operations can not use versioning", validationException);
272+
}
273+
if (request.ifPrimaryTerm() == UNASSIGNED_PRIMARY_TERM && request.ifSeqNo() != UNASSIGNED_SEQ_NO) {
274+
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
275+
}
276+
if (request.ifPrimaryTerm() != UNASSIGNED_PRIMARY_TERM && request.ifSeqNo() == UNASSIGNED_SEQ_NO) {
277+
validationException =
278+
addValidationError("ifSeqNo is unassigned, but primary term is [" + request.ifPrimaryTerm() + "]", validationException);
279+
}
280+
281+
return validationException;
282+
}
219283
}

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
503503
} else if ("update".equals(action)) {
504504
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)
505505
.version(version).versionType(versionType)
506+
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
506507
.routing(routing);
507508
// EMPTY is safe here because we never call namedObject
508509
try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput();

server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -110,27 +110,8 @@ public ActionRequestValidationException validate() {
110110
if (Strings.isEmpty(id)) {
111111
validationException = addValidationError("id is missing", validationException);
112112
}
113-
if (versionType.validateVersionForWrites(version) == false) {
114-
validationException = addValidationError("illegal version value [" + version + "] for version type ["
115-
+ versionType.name() + "]", validationException);
116-
}
117-
if (versionType == VersionType.FORCE) {
118-
validationException = addValidationError("version type [force] may no longer be used", validationException);
119-
}
120-
121-
if (ifSeqNo != UNASSIGNED_SEQ_NO && (
122-
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
123-
)) {
124-
validationException = addValidationError("compare and write operations can not use versioning", validationException);
125-
}
126113

127-
if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) {
128-
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
129-
}
130-
if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) {
131-
validationException =
132-
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
133-
}
114+
validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);
134115

135116
return validationException;
136117
}

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -188,14 +188,7 @@ public ActionRequestValidationException validate() {
188188
addValidationError("an id is required for a " + opType() + " operation", validationException);
189189
}
190190

191-
if (!versionType.validateVersionForWrites(resolvedVersion)) {
192-
validationException = addValidationError("illegal version value [" + resolvedVersion + "] for version type ["
193-
+ versionType.name() + "]", validationException);
194-
}
195-
196-
if (versionType == VersionType.FORCE) {
197-
validationException = addValidationError("version type [force] may no longer be used", validationException);
198-
}
191+
validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);
199192

200193
if (id != null && id.getBytes(StandardCharsets.UTF_8).length > 512) {
201194
validationException = addValidationError("id is too long, must be no longer than 512 bytes but was: " +
@@ -210,18 +203,6 @@ public ActionRequestValidationException validate() {
210203
validationException = addValidationError("pipeline cannot be an empty string", validationException);
211204
}
212205

213-
if (ifSeqNo != UNASSIGNED_SEQ_NO && (
214-
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
215-
)) {
216-
validationException = addValidationError("compare and write operations can not use versioning", validationException);
217-
}
218-
if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) {
219-
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
220-
}
221-
if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) {
222-
validationException =
223-
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
224-
}
225206

226207
return validationException;
227208
}

0 commit comments

Comments
 (0)