Skip to content

Commit cb973c8

Browse files
ShardBulkAction ignore primary response on primary (#38901)
Previously, if a version conflict occurred and a previous primary response was present, the original primary response would be used both for sending to replica and back to client. This was made in the past as an attempt to fix issues with conflicts after relocations where a bulk request would experience a closed shard half way through and thus have to retry on the new primary. It could then fail on its own update. With sequence numbers, this leads to an issue, since if a primary is demoted (network partitions), it will send along the original response in the request. In case of a conflict on the new primary, the old response is sent to the replica. That data could be stale, leading to inconsistency between primary and replica. Relocations now do an explicit hand-off from old to new primary and ensures that no operations are active while doing this. Above is thus no longer necessary. This change removes the special handling of conflicts and ignores primary responses when executing shard bulk requests on the primary.
1 parent dfacbf8 commit cb973c8

File tree

4 files changed

+58
-22
lines changed

4 files changed

+58
-22
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,6 @@ public String getConcreteIndex() {
172172
return getCurrentItem().index();
173173
}
174174

175-
/** returns any primary response that was set by a previous primary */
176-
public BulkItemResponse getPreviousPrimaryResponse() {
177-
return getCurrentItem().getPrimaryResponse();
178-
}
179-
180175
/** returns a translog location that is needed to be synced in order to persist all operations executed so far */
181176
public Translog.Location getLocationToSync() {
182177
assert hasMoreOperationsToExecute() == false;

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -259,16 +259,7 @@ private static void finalizePrimaryOperationOnCompletion(BulkPrimaryExecutionCon
259259
context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure);
260260
}
261261

262-
final BulkItemResponse primaryResponse;
263-
// if it's a conflict failure, and we already executed the request on a primary (and we execute it
264-
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
265-
// then just use the response we got from the failed execution
266-
if (TransportShardBulkAction.isConflictException(failure) && context.getPreviousPrimaryResponse() != null) {
267-
primaryResponse = context.getPreviousPrimaryResponse();
268-
} else {
269-
primaryResponse = executionResult;
270-
}
271-
context.markAsCompleted(primaryResponse);
262+
context.markAsCompleted(executionResult);
272263
} else {
273264
context.markAsCompleted(executionResult);
274265
}

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ public void testExecuteBulkIndexRequest() throws Exception {
144144
BulkShardRequest bulkShardRequest =
145145
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
146146

147+
randomlySetIgnoredPrimaryResponse(primaryRequest);
148+
147149
UpdateHelper updateHelper = null;
148150
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
149151
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -169,6 +171,8 @@ public void testExecuteBulkIndexRequest() throws Exception {
169171
items[0] = primaryRequest;
170172
bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
171173

174+
randomlySetIgnoredPrimaryResponse(primaryRequest);
175+
172176
BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
173177
TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper,
174178
threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {});
@@ -271,6 +275,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
271275
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
272276
.thenReturn(mappingUpdate);
273277

278+
randomlySetIgnoredPrimaryResponse(items[0]);
279+
274280
// Pretend the mappings haven't made it to the node yet
275281
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
276282
AtomicInteger updateCalled = new AtomicInteger();
@@ -326,6 +332,8 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex
326332

327333
boolean errorOnWait = randomBoolean();
328334

335+
randomlySetIgnoredPrimaryResponse(items[0]);
336+
329337
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
330338
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
331339
errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(),
@@ -365,6 +373,8 @@ public void testExecuteBulkDeleteRequest() throws Exception {
365373
Translog.Location location = new Translog.Location(0, 0, 0);
366374
UpdateHelper updateHelper = null;
367375

376+
randomlySetIgnoredPrimaryResponse(items[0]);
377+
368378
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
369379
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
370380
new NoopMappingUpdatePerformer(), () -> {});
@@ -405,6 +415,8 @@ public void testExecuteBulkDeleteRequest() throws Exception {
405415

406416
location = context.getLocationToSync();
407417

418+
randomlySetIgnoredPrimaryResponse(items[0]);
419+
408420
context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
409421
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
410422
new NoopMappingUpdatePerformer(), () -> {});
@@ -459,6 +471,8 @@ public void testNoopUpdateRequest() throws Exception {
459471
BulkShardRequest bulkShardRequest =
460472
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
461473

474+
randomlySetIgnoredPrimaryResponse(primaryRequest);
475+
462476
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
463477
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
464478
new NoopMappingUpdatePerformer(), () -> {});
@@ -503,6 +517,7 @@ public void testUpdateRequestWithFailure() throws Exception {
503517
BulkShardRequest bulkShardRequest =
504518
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
505519

520+
randomlySetIgnoredPrimaryResponse(primaryRequest);
506521

507522
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
508523
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -551,6 +566,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception {
551566
BulkShardRequest bulkShardRequest =
552567
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
553568

569+
randomlySetIgnoredPrimaryResponse(primaryRequest);
554570

555571
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
556572
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -597,6 +613,7 @@ public void testUpdateRequestWithSuccess() throws Exception {
597613
BulkShardRequest bulkShardRequest =
598614
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
599615

616+
randomlySetIgnoredPrimaryResponse(primaryRequest);
600617

601618
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
602619
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -642,6 +659,7 @@ public void testUpdateWithDelete() throws Exception {
642659
BulkShardRequest bulkShardRequest =
643660
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
644661

662+
randomlySetIgnoredPrimaryResponse(primaryRequest);
645663

646664
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
647665
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -675,6 +693,7 @@ public void testFailureDuringUpdateProcessing() throws Exception {
675693
BulkShardRequest bulkShardRequest =
676694
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
677695

696+
randomlySetIgnoredPrimaryResponse(primaryRequest);
678697

679698
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
680699
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -808,6 +827,14 @@ public void testRetries() throws Exception {
808827
assertThat(response.getSeqNo(), equalTo(13L));
809828
}
810829

830+
private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
831+
if (randomBoolean()) {
832+
// add a response to the request and thereby check that it is ignored for the primary.
833+
primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(null, "_doc",
834+
"ignore-primary-response-on-primary", 42, 42, 42, false)));
835+
}
836+
}
837+
811838
/**
812839
* Fake IndexResult that has a settable translog location
813840
*/

server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.DocWriteResponse;
2626
import org.elasticsearch.action.NoShardAvailableActionException;
2727
import org.elasticsearch.action.get.GetResponse;
28+
import org.elasticsearch.action.index.IndexRequestBuilder;
2829
import org.elasticsearch.action.index.IndexResponse;
2930
import org.elasticsearch.client.Client;
3031
import org.elasticsearch.cluster.ClusterState;
@@ -36,6 +37,7 @@
3637
import org.elasticsearch.common.settings.Settings;
3738
import org.elasticsearch.common.unit.TimeValue;
3839
import org.elasticsearch.common.xcontent.XContentType;
40+
import org.elasticsearch.index.VersionType;
3941
import org.elasticsearch.test.ESIntegTestCase;
4042
import org.elasticsearch.test.InternalTestCluster;
4143
import org.elasticsearch.test.disruption.NetworkDisruption;
@@ -63,6 +65,7 @@
6365
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6466
import static org.hamcrest.Matchers.equalTo;
6567
import static org.hamcrest.Matchers.is;
68+
import static org.hamcrest.Matchers.isOneOf;
6669
import static org.hamcrest.Matchers.not;
6770

6871
/**
@@ -72,6 +75,18 @@
7275
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
7376
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
7477

78+
private enum ConflictMode {
79+
none,
80+
external,
81+
create;
82+
83+
84+
static ConflictMode randomMode() {
85+
ConflictMode[] values = values();
86+
return values[randomInt(values.length-1)];
87+
}
88+
}
89+
7590
/**
7691
* Test that we do not loose document whose indexing request was successful, under a randomly selected disruption scheme
7792
* We also collect & report the type of indexing failures that occur.
@@ -108,7 +123,9 @@ public void testAckedIndexing() throws Exception {
108123
final AtomicReference<CountDownLatch> countDownLatchRef = new AtomicReference<>();
109124
final List<Exception> exceptedExceptions = new CopyOnWriteArrayList<>();
110125

111-
logger.info("starting indexers");
126+
final ConflictMode conflictMode = ConflictMode.randomMode();
127+
128+
logger.info("starting indexers using conflict mode " + conflictMode);
112129
try {
113130
for (final String node : nodes) {
114131
final Semaphore semaphore = new Semaphore(0);
@@ -128,12 +145,18 @@ public void testAckedIndexing() throws Exception {
128145
id = Integer.toString(idGenerator.incrementAndGet());
129146
int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
130147
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
131-
IndexResponse response =
132-
client.prepareIndex("test", "type", id)
133-
.setSource("{}", XContentType.JSON)
134-
.setTimeout(timeout)
135-
.get(timeout);
136-
assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
148+
IndexRequestBuilder indexRequestBuilder = client.prepareIndex("test", "type", id)
149+
.setSource("{}", XContentType.JSON)
150+
.setTimeout(timeout);
151+
152+
if (conflictMode == ConflictMode.external) {
153+
indexRequestBuilder.setVersion(randomIntBetween(1,10)).setVersionType(VersionType.EXTERNAL);
154+
} else if (conflictMode == ConflictMode.create) {
155+
indexRequestBuilder.setCreate(true);
156+
}
157+
158+
IndexResponse response = indexRequestBuilder.get(timeout);
159+
assertThat(response.getResult(), isOneOf(DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED));
137160
ackedDocs.put(id, node);
138161
logger.trace("[{}] indexed id [{}] through node [{}], response [{}]", name, id, node, response);
139162
} catch (ElasticsearchException e) {

0 commit comments

Comments
 (0)