From 09c44a3607754992edea089d62fcd01a5f207260 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 23 Jan 2019 11:54:00 +0100 Subject: [PATCH 1/5] Add unit tests to ShardStateAction --- .../ClusterStateCreationUtils.java | 3 + ...dStartedClusterStateTaskExecutorTests.java | 191 ++++++++++++++++++ .../action/shard/ShardStateActionTests.java | 148 +++++++------- 3 files changed, 266 insertions(+), 76 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 60053748d68c9..6d73d1f1040c4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -119,6 +119,9 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard if (primaryState == ShardRoutingState.RELOCATING) { relocatingNode = selectAndRemove(unassignedNodes); } + if (primaryState == ShardRoutingState.INITIALIZING) { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); + } } else { unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java new file mode 100644 index 0000000000000..9419fb6866c69 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -0,0 +1,191 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.action.shard; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static java.util.Collections.singletonList; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithNoShard; +import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase { + + private ClusterState clusterState; + private ShardStateAction.ShardStartedClusterStateTaskExecutor executor; + + @Before + public void setUp() throws Exception { + super.setUp(); + clusterState = stateWithNoShard(); + AllocationService allocationService = createAllocationService(Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE) + .build()); + executor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger); + } + + public void testEmptyTaskListProducesSameClusterState() throws Exception { + assertTasksExecution(Collections.emptyList(), result -> assertSame(clusterState, result.resultingState)); + } + + public void testNonExistentIndexMarkedAsSuccessful() throws Exception { + final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", "test"); + assertTasksExecution(singletonList(entry), + result -> { + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(1)); + assertThat(result.executionResults.containsKey(entry), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(entry)).isSuccess(), is(true)); + }); + } + + public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { + final String indexName = "test"; + clusterState = stateWithActivePrimary(indexName, true, randomInt(2), randomInt(2)); + + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final List tasks = Stream.concat( + // Existent shard id but different allocation id + IntStream.range(0, randomIntBetween(1, 5)) + .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), 0), String.valueOf(i), "allocation id")), + // Non existent shard id + IntStream.range(1, randomIntBetween(2, 5)) + .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), i), String.valueOf(i), "shard id")) + + ).collect(Collectors.toList()); + + assertTasksExecution(tasks, result -> { + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + }); + }); + } + + public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception { + final String indexName = "test"; + clusterState = stateWithAssignedPrimariesAndReplicas(new String[]{indexName}, randomIntBetween(2, 10), 1); + + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final List tasks = IntStream.range(0, randomIntBetween(1, indexMetaData.getNumberOfShards())) + .mapToObj(i -> { + final ShardId shardId = new ShardId(indexMetaData.getIndex(), i); + final IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); + final String allocationId; + if (randomBoolean()) { + allocationId = shardRoutingTable.primaryShard().allocationId().getId(); + } else { + allocationId = shardRoutingTable.replicaShards().iterator().next().allocationId().getId(); + } + return new StartedShardEntry(shardId, allocationId, "test"); + }).collect(Collectors.toList()); + + assertTasksExecution(tasks, result -> { + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + }); + }); + } + + public void testStartedShards() throws Exception { + final String indexName = "test"; + clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); + + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); + final String primaryAllocationId = primaryShard.allocationId().getId(); + + final List tasks = new ArrayList<>(); + tasks.add(new StartedShardEntry(shardId, primaryAllocationId, "test")); + if (randomBoolean()) { + final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); + final String replicaAllocationId = replicaShard.allocationId().getId(); + tasks.add(new StartedShardEntry(shardId, replicaAllocationId, "test")); + } + assertTasksExecution(tasks, result -> { + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + }); + }); + } + + public void testDuplicateStartsAreOkay() throws Exception { + final String indexName = "test"; + clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING); + + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); + final String allocationId = shardRouting.allocationId().getId(); + + final List tasks = IntStream.range(0, randomIntBetween(2, 10)) + .mapToObj(i -> new StartedShardEntry(shardId, allocationId, "test")) + .collect(Collectors.toList()); + + assertTasksExecution(tasks, result -> { + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + }); + }); + } + + private void assertTasksExecution(final List tasks, + final Consumer consumer) throws Exception { + final ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(clusterState, tasks); + assertThat(result, notNullValue()); + consumer.accept(result); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 2a994e2861836..431f67c842d63 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.action.shard; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; @@ -156,24 +157,9 @@ public void testSuccess() throws InterruptedException { setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); - AtomicBoolean success = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - + final TestListener listener = new TestListener(); ShardRouting shardRouting = getRandomShardRouting(index); - shardStateAction.localShardFailed(shardRouting, "test", getSimulatedFailure(), new ShardStateAction.Listener() { - @Override - public void onSuccess() { - success.set(true); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - success.set(false); - latch.countDown(); - assert false; - } - }); + shardStateAction.localShardFailed(shardRouting, "test", getSimulatedFailure(), listener); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertEquals(1, capturedRequests.length); @@ -188,8 +174,8 @@ public void onFailure(Exception e) { transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); - latch.await(); - assertTrue(success.get()); + listener.await(); + assertTrue(listener.isSuccessful()); } public void testNoMaster() throws InterruptedException { @@ -291,28 +277,14 @@ public void testUnhandledFailure() { setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); - AtomicBoolean failure = new AtomicBoolean(); - + final TestListener listener = new TestListener(); ShardRouting failedShard = getRandomShardRouting(index); - shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { - @Override - public void onSuccess() { - failure.set(false); - assert false; - } - - @Override - public void onFailure(Exception e) { - failure.set(true); - } - }); + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), listener); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); - assertFalse(failure.get()); transport.handleRemoteError(capturedRequests[0].requestId, new TransportException("simulated")); - - assertTrue(failure.get()); + assertFalse(listener.isSuccessful()); } public void testShardNotFound() throws InterruptedException { @@ -320,32 +292,17 @@ public void testShardNotFound() throws InterruptedException { setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); - AtomicBoolean success = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - + final TestListener listener = new TestListener(); ShardRouting failedShard = getRandomShardRouting(index); RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build(); setState(clusterService, ClusterState.builder(clusterService.state()).routingTable(routingTable)); - shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { - @Override - public void onSuccess() { - success.set(true); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - success.set(false); - latch.countDown(); - assert false; - } - }); + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), listener); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); - latch.await(); - assertTrue(success.get()); + listener.await(); + assertTrue(listener.isSuccessful()); } public void testNoLongerPrimaryShardException() throws InterruptedException { @@ -355,36 +312,21 @@ public void testNoLongerPrimaryShardException() throws InterruptedException { ShardRouting failedShard = getRandomShardRouting(index); - AtomicReference failure = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - + final TestListener listener = new TestListener(); long primaryTerm = clusterService.state().metaData().index(index).primaryTerm(failedShard.id()); assertThat(primaryTerm, greaterThanOrEqualTo(1L)); shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), - primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(), - new ShardStateAction.Listener() { - @Override - public void onSuccess() { - failure.set(null); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - failure.set(e); - latch.countDown(); - } - }); + primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(), listener); ShardStateAction.NoLongerPrimaryShardException catastrophicError = new ShardStateAction.NoLongerPrimaryShardException(failedShard.shardId(), "dummy failure"); CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); transport.handleRemoteError(capturedRequests[0].requestId, catastrophicError); - latch.await(); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); - assertThat(failure.get().getMessage(), equalTo(catastrophicError.getMessage())); + listener.await(); + assertNotNull(listener.getFailure()); + assertThat(listener.getFailure(), instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); + assertThat(listener.getFailure().getMessage(), equalTo(catastrophicError.getMessage())); } public void testCacheRemoteShardFailed() throws Exception { @@ -471,6 +413,24 @@ public void onFailure(Exception e) { masterThread.join(); } + public void testShardStarted() throws InterruptedException { + final String index = "test"; + setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); + + final ShardRouting shardRouting = getRandomShardRouting(index); + final TestListener listener = new TestListener(); + shardStateAction.shardStarted(shardRouting, "testShardStarted", listener); + + final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + listener.await(); + + assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class)); + ShardStateAction.StartedShardEntry entry = (ShardStateAction.StartedShardEntry) capturedRequests[0].request; + assertThat(entry.shardId, equalTo(shardRouting.shardId())); + assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId())); + } + private ShardRouting getRandomShardRouting(String index) { IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt(); @@ -600,4 +560,40 @@ public void onFailure(Exception e) { } }); } + + private static class TestListener implements ShardStateAction.Listener { + + private final SetOnce failure = new SetOnce<>(); + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public void onSuccess() { + try { + failure.set(null); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(final Exception e) { + try { + failure.set(e); + } finally { + latch.countDown(); + } + } + + boolean isSuccessful() { + return getFailure() == null; + } + + Exception getFailure() { + return failure.get(); + } + + void await() throws InterruptedException { + latch.await(); + } + } } From f63b0adb1d26b89450e75d103704344fd08df326 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 23 Jan 2019 13:45:52 +0100 Subject: [PATCH 2/5] Adapt ClusterAllocationExplainActionTests --- .../ClusterAllocationExplainActionTests.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java index a75510cfb64ef..d0a55972cc1d8 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainActionTests.java @@ -24,6 +24,8 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationDecision; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; @@ -35,6 +37,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.gateway.TestGatewayAllocator; +import java.time.Instant; import java.util.Collections; import java.util.Locale; @@ -85,7 +88,16 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing "wait until initialization has completed"; } assertEquals("{\"index\":\"idx\",\"shard\":0,\"primary\":true,\"current_state\":\"" + - shardRoutingState.toString().toLowerCase(Locale.ROOT) + "\",\"current_node\":" + + shardRoutingState.toString().toLowerCase(Locale.ROOT) + "\"" + + (shard.unassignedInfo() != null ? + ",\"unassigned_info\":{" + + "\"reason\":\"" + shard.unassignedInfo().getReason() + "\"," + + "\"at\":\""+ UnassignedInfo.DATE_TIME_FORMATTER.format( + Instant.ofEpochMilli(shard.unassignedInfo().getUnassignedTimeInMillis())) + "\"," + + "\"last_allocation_status\":\"" + AllocationDecision.fromAllocationStatus( + shard.unassignedInfo().getLastAllocationStatus()) + "\"}" + : "") + + ",\"current_node\":" + "{\"id\":\"" + cae.getCurrentNode().getId() + "\",\"name\":\"" + cae.getCurrentNode().getName() + "\",\"transport_address\":\"" + cae.getCurrentNode().getAddress() + "\"},\"explanation\":\"" + explanation + "\"}", Strings.toString(builder)); From 7223d31e5578cad6667c452f4eb324890c71d505 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 23 Jan 2019 14:23:53 +0100 Subject: [PATCH 3/5] check that shards are started --- .../shard/ShardStartedClusterStateTaskExecutorTests.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index 9419fb6866c69..cd224955c0f4d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -155,6 +155,9 @@ public void testStartedShards() throws Exception { tasks.forEach(task -> { assertThat(result.executionResults.containsKey(task), is(true)); assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); }); }); } @@ -178,6 +181,9 @@ public void testDuplicateStartsAreOkay() throws Exception { tasks.forEach(task -> { assertThat(result.executionResults.containsKey(task), is(true)); assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); }); }); } From 1093049a847e964e3196240ad3bebf6f16db96da Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 25 Jan 2019 13:27:15 +0100 Subject: [PATCH 4/5] apply feedback --- .../ClusterStateCreationUtils.java | 3 +- ...rdFailedClusterStateTaskExecutorTests.java | 2 +- ...dStartedClusterStateTaskExecutorTests.java | 32 +++++++++---------- .../action/shard/ShardStateActionTests.java | 30 ++++++++--------- 4 files changed, 31 insertions(+), 36 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 6d73d1f1040c4..6b628d88c59d4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -118,8 +118,7 @@ public static ClusterState state(String index, boolean activePrimaryLocal, Shard } if (primaryState == ShardRoutingState.RELOCATING) { relocatingNode = selectAndRemove(unassignedNodes); - } - if (primaryState == ShardRoutingState.INITIALIZING) { + } else if (primaryState == ShardRoutingState.INITIALIZING) { unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); } } else { diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 60a5d4a3e3f1f..9ee0cf368ee5c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -73,7 +73,7 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa private ClusterState clusterState; private ShardStateAction.ShardFailedClusterStateTaskExecutor executor; - @Before + @Override public void setUp() throws Exception { super.setUp(); allocationService = createAllocationService(Settings.builder() diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index cd224955c0f4d..1d3a523cdc94f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; -import org.junit.Before; import java.util.ArrayList; import java.util.Collections; @@ -52,13 +51,11 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestCase { - private ClusterState clusterState; private ShardStateAction.ShardStartedClusterStateTaskExecutor executor; - @Before + @Override public void setUp() throws Exception { super.setUp(); - clusterState = stateWithNoShard(); AllocationService allocationService = createAllocationService(Settings.builder() .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE) .build()); @@ -66,12 +63,14 @@ public void setUp() throws Exception { } public void testEmptyTaskListProducesSameClusterState() throws Exception { - assertTasksExecution(Collections.emptyList(), result -> assertSame(clusterState, result.resultingState)); + final ClusterState clusterState = stateWithNoShard(); + assertTasksExecution(clusterState, Collections.emptyList(), result -> assertSame(clusterState, result.resultingState)); } public void testNonExistentIndexMarkedAsSuccessful() throws Exception { + final ClusterState clusterState = stateWithNoShard(); final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", "test"); - assertTasksExecution(singletonList(entry), + assertTasksExecution(clusterState, singletonList(entry), result -> { assertSame(clusterState, result.resultingState); assertThat(result.executionResults.size(), equalTo(1)); @@ -82,7 +81,7 @@ public void testNonExistentIndexMarkedAsSuccessful() throws Exception { public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { final String indexName = "test"; - clusterState = stateWithActivePrimary(indexName, true, randomInt(2), randomInt(2)); + final ClusterState clusterState = stateWithActivePrimary(indexName, true, randomInt(2), randomInt(2)); final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); final List tasks = Stream.concat( @@ -95,7 +94,7 @@ public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { ).collect(Collectors.toList()); - assertTasksExecution(tasks, result -> { + assertTasksExecution(clusterState, tasks, result -> { assertSame(clusterState, result.resultingState); assertThat(result.executionResults.size(), equalTo(tasks.size())); tasks.forEach(task -> { @@ -107,7 +106,7 @@ public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception { final String indexName = "test"; - clusterState = stateWithAssignedPrimariesAndReplicas(new String[]{indexName}, randomIntBetween(2, 10), 1); + final ClusterState clusterState = stateWithAssignedPrimariesAndReplicas(new String[]{indexName}, randomIntBetween(2, 10), 1); final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); final List tasks = IntStream.range(0, randomIntBetween(1, indexMetaData.getNumberOfShards())) @@ -123,7 +122,7 @@ public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception { return new StartedShardEntry(shardId, allocationId, "test"); }).collect(Collectors.toList()); - assertTasksExecution(tasks, result -> { + assertTasksExecution(clusterState, tasks, result -> { assertSame(clusterState, result.resultingState); assertThat(result.executionResults.size(), equalTo(tasks.size())); tasks.forEach(task -> { @@ -135,7 +134,7 @@ public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception { public void testStartedShards() throws Exception { final String indexName = "test"; - clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); + final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); @@ -149,7 +148,7 @@ public void testStartedShards() throws Exception { final String replicaAllocationId = replicaShard.allocationId().getId(); tasks.add(new StartedShardEntry(shardId, replicaAllocationId, "test")); } - assertTasksExecution(tasks, result -> { + assertTasksExecution(clusterState, tasks, result -> { assertNotSame(clusterState, result.resultingState); assertThat(result.executionResults.size(), equalTo(tasks.size())); tasks.forEach(task -> { @@ -164,7 +163,7 @@ public void testStartedShards() throws Exception { public void testDuplicateStartsAreOkay() throws Exception { final String indexName = "test"; - clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING); + final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING); final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); @@ -175,7 +174,7 @@ public void testDuplicateStartsAreOkay() throws Exception { .mapToObj(i -> new StartedShardEntry(shardId, allocationId, "test")) .collect(Collectors.toList()); - assertTasksExecution(tasks, result -> { + assertTasksExecution(clusterState, tasks, result -> { assertNotSame(clusterState, result.resultingState); assertThat(result.executionResults.size(), equalTo(tasks.size())); tasks.forEach(task -> { @@ -188,9 +187,10 @@ public void testDuplicateStartsAreOkay() throws Exception { }); } - private void assertTasksExecution(final List tasks, + private void assertTasksExecution(final ClusterState state, + final List tasks, final Consumer consumer) throws Exception { - final ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(clusterState, tasks); + final ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(state, tasks); assertThat(result, notNullValue()); consumer.accept(result); } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 431f67c842d63..e94a974ae7a89 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -175,7 +175,7 @@ public void testSuccess() throws InterruptedException { transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); listener.await(); - assertTrue(listener.isSuccessful()); + assertNull(listener.failure.get()); } public void testNoMaster() throws InterruptedException { @@ -284,7 +284,7 @@ public void testUnhandledFailure() { final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); transport.handleRemoteError(capturedRequests[0].requestId, new TransportException("simulated")); - assertFalse(listener.isSuccessful()); + assertNotNull(listener.failure.get()); } public void testShardNotFound() throws InterruptedException { @@ -302,7 +302,7 @@ public void testShardNotFound() throws InterruptedException { transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); listener.await(); - assertTrue(listener.isSuccessful()); + assertNull(listener.failure.get()); } public void testNoLongerPrimaryShardException() throws InterruptedException { @@ -324,9 +324,11 @@ public void testNoLongerPrimaryShardException() throws InterruptedException { transport.handleRemoteError(capturedRequests[0].requestId, catastrophicError); listener.await(); - assertNotNull(listener.getFailure()); - assertThat(listener.getFailure(), instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); - assertThat(listener.getFailure().getMessage(), equalTo(catastrophicError.getMessage())); + + final Exception failure = listener.failure.get(); + assertNotNull(failure); + assertThat(failure, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); + assertThat(failure.getMessage(), equalTo(catastrophicError.getMessage())); } public void testCacheRemoteShardFailed() throws Exception { @@ -422,13 +424,15 @@ public void testShardStarted() throws InterruptedException { shardStateAction.shardStarted(shardRouting, "testShardStarted", listener); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); - listener.await(); - assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class)); + ShardStateAction.StartedShardEntry entry = (ShardStateAction.StartedShardEntry) capturedRequests[0].request; assertThat(entry.shardId, equalTo(shardRouting.shardId())); assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId())); + + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + listener.await(); + assertNull(listener.failure.get()); } private ShardRouting getRandomShardRouting(String index) { @@ -584,14 +588,6 @@ public void onFailure(final Exception e) { } } - boolean isSuccessful() { - return getFailure() == null; - } - - Exception getFailure() { - return failure.get(); - } - void await() throws InterruptedException { latch.await(); } From 0d3442ae5b64aa52363eaa60171b5cb3d567afbf Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 25 Jan 2019 13:38:33 +0100 Subject: [PATCH 5/5] Fix imports --- .../action/shard/ShardFailedClusterStateTaskExecutorTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 9ee0cf368ee5c..4dbe62cf5cebb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -48,7 +48,6 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import org.junit.Before; import java.util.ArrayList; import java.util.Arrays;