Skip to content

Commit 8461ab1

Browse files
ShardBulkAction ignore primary response on primary (#38901)
Previously, if a version conflict occurred and a previous primary response was present, the original primary response would be used both for sending to replica and back to client. This was made in the past as an attempt to fix issues with conflicts after relocations where a bulk request would experience a closed shard half way through and thus have to retry on the new primary. It could then fail on its own update. With sequence numbers, this leads to an issue, since if a primary is demoted (network partitions), it will send along the original response in the request. In case of a conflict on the new primary, the old response is sent to the replica. That data could be stale, leading to inconsistency between primary and replica. Relocations now do an explicit hand-off from old to new primary and ensures that no operations are active while doing this. Above is thus no longer necessary. This change removes the special handling of conflicts and ignores primary responses when executing shard bulk requests on the primary.
1 parent b1f829f commit 8461ab1

File tree

4 files changed

+56
-21
lines changed

4 files changed

+56
-21
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,6 @@ public String getConcreteIndex() {
172172
return getCurrentItem().index();
173173
}
174174

175-
/** returns any primary response that was set by a previous primary */
176-
public BulkItemResponse getPreviousPrimaryResponse() {
177-
return getCurrentItem().getPrimaryResponse();
178-
}
179-
180175
/** returns a translog location that is needed to be synced in order to persist all operations executed so far */
181176
public Translog.Location getLocationToSync() {
182177
assert hasMoreOperationsToExecute() == false;

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -261,16 +261,7 @@ private static void finalizePrimaryOperationOnCompletion(BulkPrimaryExecutionCon
261261
context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure);
262262
}
263263

264-
final BulkItemResponse primaryResponse;
265-
// if it's a conflict failure, and we already executed the request on a primary (and we execute it
266-
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
267-
// then just use the response we got from the failed execution
268-
if (TransportShardBulkAction.isConflictException(failure) && context.getPreviousPrimaryResponse() != null) {
269-
primaryResponse = context.getPreviousPrimaryResponse();
270-
} else {
271-
primaryResponse = executionResult;
272-
}
273-
context.markAsCompleted(primaryResponse);
264+
context.markAsCompleted(executionResult);
274265
} else {
275266
context.markAsCompleted(executionResult);
276267
}

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ public void testExecuteBulkIndexRequest() throws Exception {
144144
BulkShardRequest bulkShardRequest =
145145
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
146146

147+
randomlySetIgnoredPrimaryResponse(primaryRequest);
148+
147149
UpdateHelper updateHelper = null;
148150
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
149151
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -169,6 +171,8 @@ public void testExecuteBulkIndexRequest() throws Exception {
169171
items[0] = primaryRequest;
170172
bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
171173

174+
randomlySetIgnoredPrimaryResponse(primaryRequest);
175+
172176
BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
173177
TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper,
174178
threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {});
@@ -271,6 +275,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
271275
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
272276
.thenReturn(mappingUpdate);
273277

278+
randomlySetIgnoredPrimaryResponse(items[0]);
279+
274280
// Pretend the mappings haven't made it to the node yet
275281
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
276282
AtomicInteger updateCalled = new AtomicInteger();
@@ -326,6 +332,8 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex
326332

327333
boolean errorOnWait = randomBoolean();
328334

335+
randomlySetIgnoredPrimaryResponse(items[0]);
336+
329337
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
330338
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
331339
errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(),
@@ -365,6 +373,8 @@ public void testExecuteBulkDeleteRequest() throws Exception {
365373
Translog.Location location = new Translog.Location(0, 0, 0);
366374
UpdateHelper updateHelper = null;
367375

376+
randomlySetIgnoredPrimaryResponse(items[0]);
377+
368378
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
369379
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
370380
new NoopMappingUpdatePerformer(), () -> {});
@@ -405,6 +415,8 @@ public void testExecuteBulkDeleteRequest() throws Exception {
405415

406416
location = context.getLocationToSync();
407417

418+
randomlySetIgnoredPrimaryResponse(items[0]);
419+
408420
context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
409421
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
410422
new NoopMappingUpdatePerformer(), () -> {});
@@ -459,6 +471,8 @@ public void testNoopUpdateRequest() throws Exception {
459471
BulkShardRequest bulkShardRequest =
460472
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
461473

474+
randomlySetIgnoredPrimaryResponse(primaryRequest);
475+
462476
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
463477
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
464478
new NoopMappingUpdatePerformer(), () -> {});
@@ -503,6 +517,7 @@ public void testUpdateRequestWithFailure() throws Exception {
503517
BulkShardRequest bulkShardRequest =
504518
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
505519

520+
randomlySetIgnoredPrimaryResponse(primaryRequest);
506521

507522
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
508523
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -552,6 +567,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception {
552567
BulkShardRequest bulkShardRequest =
553568
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
554569

570+
randomlySetIgnoredPrimaryResponse(primaryRequest);
555571

556572
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
557573
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -598,6 +614,7 @@ public void testUpdateRequestWithSuccess() throws Exception {
598614
BulkShardRequest bulkShardRequest =
599615
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
600616

617+
randomlySetIgnoredPrimaryResponse(primaryRequest);
601618

602619
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
603620
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -643,6 +660,7 @@ public void testUpdateWithDelete() throws Exception {
643660
BulkShardRequest bulkShardRequest =
644661
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
645662

663+
randomlySetIgnoredPrimaryResponse(primaryRequest);
646664

647665
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
648666
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -676,6 +694,7 @@ public void testFailureDuringUpdateProcessing() throws Exception {
676694
BulkShardRequest bulkShardRequest =
677695
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
678696

697+
randomlySetIgnoredPrimaryResponse(primaryRequest);
679698

680699
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
681700
TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis,
@@ -809,6 +828,14 @@ public void testRetries() throws Exception {
809828
assertThat(response.getSeqNo(), equalTo(13L));
810829
}
811830

831+
private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
832+
if (randomBoolean()) {
833+
// add a response to the request and thereby check that it is ignored for the primary.
834+
primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(null, "_doc",
835+
"ignore-primary-response-on-primary", 42, 42, 42, false)));
836+
}
837+
}
838+
812839
/**
813840
* Fake IndexResult that has a settable translog location
814841
*/

server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.action.ActionListener;
2626
import org.elasticsearch.action.NoShardAvailableActionException;
2727
import org.elasticsearch.action.get.GetResponse;
28+
import org.elasticsearch.action.index.IndexRequestBuilder;
2829
import org.elasticsearch.action.index.IndexResponse;
2930
import org.elasticsearch.client.Client;
3031
import org.elasticsearch.cluster.ClusterState;
@@ -37,6 +38,7 @@
3738
import org.elasticsearch.common.settings.Settings;
3839
import org.elasticsearch.common.unit.TimeValue;
3940
import org.elasticsearch.common.xcontent.XContentType;
41+
import org.elasticsearch.index.VersionType;
4042
import org.elasticsearch.test.ESIntegTestCase;
4143
import org.elasticsearch.test.InternalTestCluster;
4244
import org.elasticsearch.test.disruption.NetworkDisruption;
@@ -75,6 +77,18 @@
7577
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
7678
public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
7779

80+
private enum ConflictMode {
81+
none,
82+
external,
83+
create;
84+
85+
86+
static ConflictMode randomMode() {
87+
ConflictMode[] values = values();
88+
return values[randomInt(values.length-1)];
89+
}
90+
}
91+
7892
/**
7993
* Test that we do not loose document whose indexing request was successful, under a randomly selected disruption scheme
8094
* We also collect & report the type of indexing failures that occur.
@@ -111,7 +125,9 @@ public void testAckedIndexing() throws Exception {
111125
final AtomicReference<CountDownLatch> countDownLatchRef = new AtomicReference<>();
112126
final List<Exception> exceptedExceptions = new CopyOnWriteArrayList<>();
113127

114-
logger.info("starting indexers");
128+
final ConflictMode conflictMode = ConflictMode.randomMode();
129+
130+
logger.info("starting indexers using conflict mode " + conflictMode);
115131
try {
116132
for (final String node : nodes) {
117133
final Semaphore semaphore = new Semaphore(0);
@@ -131,11 +147,17 @@ public void testAckedIndexing() throws Exception {
131147
id = Integer.toString(idGenerator.incrementAndGet());
132148
int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
133149
logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
134-
IndexResponse response =
135-
client.prepareIndex("test", "type", id)
136-
.setSource("{}", XContentType.JSON)
137-
.setTimeout(timeout)
138-
.get(timeout);
150+
IndexRequestBuilder indexRequestBuilder = client.prepareIndex("test", "type", id)
151+
.setSource("{}", XContentType.JSON)
152+
.setTimeout(timeout);
153+
154+
if (conflictMode == ConflictMode.external) {
155+
indexRequestBuilder.setVersion(randomIntBetween(1,10)).setVersionType(VersionType.EXTERNAL);
156+
} else if (conflictMode == ConflictMode.create) {
157+
indexRequestBuilder.setCreate(true);
158+
}
159+
160+
IndexResponse response = indexRequestBuilder.get(timeout);
139161
assertThat(response.getResult(), isOneOf(CREATED, UPDATED));
140162
ackedDocs.put(id, node);
141163
logger.trace("[{}] indexed id [{}] through node [{}], response [{}]", name, id, node, response);

0 commit comments

Comments
 (0)