From b1a306d44fcbeb2e309b811bd928fe3b6098e46e Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Thu, 14 Feb 2019 15:06:41 +0100 Subject: [PATCH 1/3] ShardBulkAction ignore primary response on primary Previously, if a version conflict occurred and a previous primary response was present, the original primary response would be used both for sending to replica and back to client. This was an attempt to fix issues with conflicts after relocations where a bulk request would experience a closed shard half way through and thus have to retry on the new primary. With sequence numbers, this leads to an issue, since if a primary is demoted (network partitions), it will send along the original response in the request. In case of a conflict on the new primary, the old response is sent to the replica. That data could be stale, leading to inconsistency between primary and replica. Relocations now do an explicit hand-off from old to new primary and ensures that no operations are active while doing this. Above is thus no longer necessary. This change removes the special handling of conflicts and ignores primary responses when executing shard bulk requests on the primary. --- .../bulk/BulkPrimaryExecutionContext.java | 5 -- .../action/bulk/TransportShardBulkAction.java | 11 +--- .../bulk/TransportShardBulkActionTests.java | 27 ++++++++++ .../discovery/ClusterDisruptionIT.java | 51 ++++++++++++++++--- 4 files changed, 72 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java index 5f61d90d500e7..65452f9a75dba 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java @@ -172,11 +172,6 @@ public String getConcreteIndex() { return getCurrentItem().index(); } - /** returns any primary response that was set by a previous primary */ - public BulkItemResponse getPreviousPrimaryResponse() { - return getCurrentItem().getPrimaryResponse(); - } - /** returns a translog location that is needed to be synced in order to persist all operations executed so far */ public Translog.Location getLocationToSync() { assert hasMoreOperationsToExecute() == false; diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 4df8efa6b2743..d5f26e59d4c87 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -261,16 +261,7 @@ private static void finalizePrimaryOperationOnCompletion(BulkPrimaryExecutionCon context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); } - final BulkItemResponse primaryResponse; - // if it's a conflict failure, and we already executed the request on a primary (and we execute it - // again, due to primary relocation and only processing up to N bulk items when the shard gets closed) - // then just use the response we got from the failed execution - if (TransportShardBulkAction.isConflictException(failure) && context.getPreviousPrimaryResponse() != null) { - primaryResponse = context.getPreviousPrimaryResponse(); - } else { - primaryResponse = executionResult; - } - context.markAsCompleted(primaryResponse); + context.markAsCompleted(executionResult); } else { context.markAsCompleted(executionResult); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 37e82884c5133..1ef718e785e9f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -144,6 +144,8 @@ public void testExecuteBulkIndexRequest() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomSetIgnoredPrimaryResponse(primaryRequest); + UpdateHelper updateHelper = null; BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -169,6 +171,8 @@ public void testExecuteBulkIndexRequest() throws Exception { items[0] = primaryRequest; bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomSetIgnoredPrimaryResponse(primaryRequest); + BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper, threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {}); @@ -271,6 +275,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) .thenReturn(mappingUpdate); + randomSetIgnoredPrimaryResponse(items[0]); + // Pretend the mappings haven't made it to the node yet BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); AtomicInteger updateCalled = new AtomicInteger(); @@ -326,6 +332,8 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex boolean errorOnWait = randomBoolean(); + randomSetIgnoredPrimaryResponse(items[0]); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(), @@ -365,6 +373,8 @@ public void testExecuteBulkDeleteRequest() throws Exception { Translog.Location location = new Translog.Location(0, 0, 0); UpdateHelper updateHelper = null; + randomSetIgnoredPrimaryResponse(items[0]); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); @@ -405,6 +415,8 @@ public void testExecuteBulkDeleteRequest() throws Exception { location = context.getLocationToSync(); + randomSetIgnoredPrimaryResponse(items[0]); + context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); @@ -459,6 +471,8 @@ public void testNoopUpdateRequest() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomSetIgnoredPrimaryResponse(primaryRequest); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), () -> {}); @@ -503,6 +517,7 @@ public void testUpdateRequestWithFailure() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomSetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -552,6 +567,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomSetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -598,6 +614,7 @@ public void testUpdateRequestWithSuccess() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomSetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -643,6 +660,7 @@ public void testUpdateWithDelete() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomSetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -676,6 +694,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + randomSetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -809,6 +828,14 @@ public void testRetries() throws Exception { assertThat(response.getSeqNo(), equalTo(13L)); } + private void randomSetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { + if (randomBoolean()) { + // add a response to the request and thereby check that it is ignored for the primary. + primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(null, "_doc", + "ignore-primary-response-on-primary", 42, 42, 42, false))); + } + } + /** * Fake IndexResult that has a settable translog location */ diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index d8262dc4f576d..bbac6bc125d78 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -37,6 +38,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.NetworkDisruption; @@ -75,6 +77,33 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) public class ClusterDisruptionIT extends AbstractDisruptionTestCase { + private enum ConflictMode { + none, + external, + create; + + + static ConflictMode randomMode() { + ConflictMode[] values = values(); + return values[randomInt(values.length-1)]; + } + } + + + // once this has proven to work out fine in all cases, we can revert this to randomly picking the conflict mode. + public void testAckedIndexCreateOnly() throws Exception { + testAckedIndexing(ConflictMode.create); + } + + public void testAckedIndexExternalVersioning() throws Exception { + testAckedIndexing(ConflictMode.external); + } + + public void testAckedIndexing() throws Exception { + testAckedIndexing(ConflictMode.none); + } + + /** * Test that we do not loose document whose indexing request was successful, under a randomly selected disruption scheme * We also collect & report the type of indexing failures that occur. @@ -85,7 +114,7 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase { "org.elasticsearch.discovery:TRACE,org.elasticsearch.action.support.replication:TRACE," + "org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," + "org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE") - public void testAckedIndexing() throws Exception { + private void testAckedIndexing(final ConflictMode conflictMode) throws Exception { final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5; final String timeout = seconds + "s"; @@ -111,7 +140,9 @@ public void testAckedIndexing() throws Exception { final AtomicReference countDownLatchRef = new AtomicReference<>(); final List exceptedExceptions = new CopyOnWriteArrayList<>(); - logger.info("starting indexers"); +// final ConflictMode conflictMode = ConflictMode.randomMode(); + + logger.info("starting indexers using conflict mode " + conflictMode); try { for (final String node : nodes) { final Semaphore semaphore = new Semaphore(0); @@ -131,11 +162,17 @@ public void testAckedIndexing() throws Exception { id = Integer.toString(idGenerator.incrementAndGet()); int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries); logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard); - IndexResponse response = - client.prepareIndex("test", "type", id) - .setSource("{}", XContentType.JSON) - .setTimeout(timeout) - .get(timeout); + IndexRequestBuilder indexRequestBuilder = client.prepareIndex("test", "type", id) + .setSource("{}", XContentType.JSON) + .setTimeout(timeout); + + if (conflictMode == ConflictMode.external) { + indexRequestBuilder.setVersion(10).setVersionType(VersionType.EXTERNAL); + } else if (conflictMode == ConflictMode.create) { + indexRequestBuilder.setCreate(true); + } + + IndexResponse response = indexRequestBuilder.get(timeout); assertThat(response.getResult(), isOneOf(CREATED, UPDATED)); ackedDocs.put(id, node); logger.trace("[{}] indexed id [{}] through node [{}], response [{}]", name, id, node, response); From bbddad04e96a60b362f17e7cadff3579174b346b Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Thu, 14 Feb 2019 15:43:19 +0100 Subject: [PATCH 2/3] ShardBulkAction ignore primary response on primary Better naming of test methods and use a random external version. --- .../bulk/TransportShardBulkActionTests.java | 26 +++++++++---------- .../discovery/ClusterDisruptionIT.java | 6 ++--- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 1ef718e785e9f..e52924585d2c4 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -144,7 +144,7 @@ public void testExecuteBulkIndexRequest() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomSetIgnoredPrimaryResponse(primaryRequest); + randomlySetIgnoredPrimaryResponse(primaryRequest); UpdateHelper updateHelper = null; BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); @@ -171,7 +171,7 @@ public void testExecuteBulkIndexRequest() throws Exception { items[0] = primaryRequest; bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomSetIgnoredPrimaryResponse(primaryRequest); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper, @@ -275,7 +275,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) .thenReturn(mappingUpdate); - randomSetIgnoredPrimaryResponse(items[0]); + randomlySetIgnoredPrimaryResponse(items[0]); // Pretend the mappings haven't made it to the node yet BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); @@ -332,7 +332,7 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex boolean errorOnWait = randomBoolean(); - randomSetIgnoredPrimaryResponse(items[0]); + randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -373,7 +373,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { Translog.Location location = new Translog.Location(0, 0, 0); UpdateHelper updateHelper = null; - randomSetIgnoredPrimaryResponse(items[0]); + randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -415,7 +415,7 @@ public void testExecuteBulkDeleteRequest() throws Exception { location = context.getLocationToSync(); - randomSetIgnoredPrimaryResponse(items[0]); + randomlySetIgnoredPrimaryResponse(items[0]); context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -471,7 +471,7 @@ public void testNoopUpdateRequest() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomSetIgnoredPrimaryResponse(primaryRequest); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -517,7 +517,7 @@ public void testUpdateRequestWithFailure() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomSetIgnoredPrimaryResponse(primaryRequest); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -567,7 +567,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomSetIgnoredPrimaryResponse(primaryRequest); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -614,7 +614,7 @@ public void testUpdateRequestWithSuccess() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomSetIgnoredPrimaryResponse(primaryRequest); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -660,7 +660,7 @@ public void testUpdateWithDelete() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomSetIgnoredPrimaryResponse(primaryRequest); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -694,7 +694,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - randomSetIgnoredPrimaryResponse(primaryRequest); + randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, @@ -828,7 +828,7 @@ public void testRetries() throws Exception { assertThat(response.getSeqNo(), equalTo(13L)); } - private void randomSetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { + private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { if (randomBoolean()) { // add a response to the request and thereby check that it is ignored for the primary. primaryRequest.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new IndexResponse(null, "_doc", diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index bbac6bc125d78..85b857f4f09d7 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -91,11 +91,11 @@ static ConflictMode randomMode() { // once this has proven to work out fine in all cases, we can revert this to randomly picking the conflict mode. - public void testAckedIndexCreateOnly() throws Exception { + public void testAckedIndexWithCreateOpType() throws Exception { testAckedIndexing(ConflictMode.create); } - public void testAckedIndexExternalVersioning() throws Exception { + public void testAckedIndexWithExternalVersioning() throws Exception { testAckedIndexing(ConflictMode.external); } @@ -167,7 +167,7 @@ private void testAckedIndexing(final ConflictMode conflictMode) throws Exception .setTimeout(timeout); if (conflictMode == ConflictMode.external) { - indexRequestBuilder.setVersion(10).setVersionType(VersionType.EXTERNAL); + indexRequestBuilder.setVersion(randomIntBetween(1,10)).setVersionType(VersionType.EXTERNAL); } else if (conflictMode == ConflictMode.create) { indexRequestBuilder.setCreate(true); } From d8d50395eb16394495fc09c691c513897aec93c4 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Thu, 14 Feb 2019 17:16:52 +0100 Subject: [PATCH 3/3] ShardBulkAction ignore primary response on primary Collapse 3 tests into one and pick the mode randomly instead. --- .../discovery/ClusterDisruptionIT.java | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 85b857f4f09d7..9fd08511446d7 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -89,21 +89,6 @@ static ConflictMode randomMode() { } } - - // once this has proven to work out fine in all cases, we can revert this to randomly picking the conflict mode. - public void testAckedIndexWithCreateOpType() throws Exception { - testAckedIndexing(ConflictMode.create); - } - - public void testAckedIndexWithExternalVersioning() throws Exception { - testAckedIndexing(ConflictMode.external); - } - - public void testAckedIndexing() throws Exception { - testAckedIndexing(ConflictMode.none); - } - - /** * Test that we do not loose document whose indexing request was successful, under a randomly selected disruption scheme * We also collect & report the type of indexing failures that occur. @@ -114,7 +99,7 @@ public void testAckedIndexing() throws Exception { "org.elasticsearch.discovery:TRACE,org.elasticsearch.action.support.replication:TRACE," + "org.elasticsearch.cluster.service:TRACE,org.elasticsearch.indices.recovery:TRACE," + "org.elasticsearch.indices.cluster:TRACE,org.elasticsearch.index.shard:TRACE") - private void testAckedIndexing(final ConflictMode conflictMode) throws Exception { + public void testAckedIndexing() throws Exception { final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5; final String timeout = seconds + "s"; @@ -140,7 +125,7 @@ private void testAckedIndexing(final ConflictMode conflictMode) throws Exception final AtomicReference countDownLatchRef = new AtomicReference<>(); final List exceptedExceptions = new CopyOnWriteArrayList<>(); -// final ConflictMode conflictMode = ConflictMode.randomMode(); + final ConflictMode conflictMode = ConflictMode.randomMode(); logger.info("starting indexers using conflict mode " + conflictMode); try {