From 2cf4b139c5b212e76378d68ee0822c7395aa32ee Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 25 Jan 2019 17:17:54 +0100 Subject: [PATCH 1/4] Use primary terms to start shards --- .../action/shard/ShardStateAction.java | 54 +++++++++++++--- .../cluster/IndicesClusterStateService.java | 46 +++++++------- ...dStartedClusterStateTaskExecutorTests.java | 63 ++++++++++++++++--- .../action/shard/ShardStateActionTests.java | 61 ++++++++++++++++-- .../indices/cluster/ClusterStateChanges.java | 9 ++- 5 files changed, 188 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index e5c46cbb0ee08..67ff1b4f4ae0e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -494,12 +494,20 @@ public int hashCode() { } } - public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) { - shardStarted(shardRouting, message, listener, clusterService.state()); + public void shardStarted(final ShardRouting shardRouting, + final long primaryTerm, + final String message, + final Listener listener) { + shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state()); } - public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener, ClusterState currentState) { - StartedShardEntry shardEntry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), message); - sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, shardEntry, listener); + + public void shardStarted(final ShardRouting shardRouting, + final long primaryTerm, + final String message, + final Listener listener, + final ClusterState currentState) { + StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message); + sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener); } private static class ShardStartedTransportHandler implements TransportRequestHandler { @@ -544,6 +552,26 @@ public ClusterTasksResult execute(ClusterState currentState, List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); Set seenShardRoutings = new HashSet<>(); // to prevent duplicates for (StartedShardEntry task : tasks) { + final IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex()); + if (indexMetaData == null) { + // tasks that correspond to non-existent indices are marked as successful + logger.debug("{} ignoring shard started task [{}] (unknown index {})", task.shardId, task, task.shardId.getIndex()); + builder.success(task); + continue; + } else { + if (task.primaryTerm > 0) { + final long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id()); + if (currentPrimaryTerm != task.primaryTerm) { + assert currentPrimaryTerm > task.primaryTerm : "received a primary term with a higher term than in the " + + "current cluster state (received [" + task.primaryTerm + "] but current is [" + currentPrimaryTerm + "])"; + logger.debug("{} ignoring shard started task [{}] (primary term {} does not match current term {})", + task.shardId, task, task.primaryTerm, currentPrimaryTerm); + builder.success(task); + continue; + } + } + } + ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); if (matched == null) { // tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started @@ -597,6 +625,7 @@ public void onFailure(String source, Exception e) { public static class StartedShardEntry extends TransportRequest { final ShardId shardId; final String allocationId; + final long primaryTerm; final String message; StartedShardEntry(StreamInput in) throws IOException { @@ -604,8 +633,12 @@ public static class StartedShardEntry extends TransportRequest { shardId = ShardId.readShardId(in); allocationId = in.readString(); if (in.getVersion().before(Version.V_6_3_0)) { - final long primaryTerm = in.readVLong(); + primaryTerm = in.readVLong(); assert primaryTerm == UNASSIGNED_PRIMARY_TERM : "shard is only started by itself: primary term [" + primaryTerm + "]"; + } else if (in.getVersion().onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport + primaryTerm = in.readVLong(); + } else { + primaryTerm = UNASSIGNED_PRIMARY_TERM; } this.message = in.readString(); if (in.getVersion().before(Version.V_6_3_0)) { @@ -614,9 +647,10 @@ public static class StartedShardEntry extends TransportRequest { } } - public StartedShardEntry(ShardId shardId, String allocationId, String message) { + public StartedShardEntry(final ShardId shardId, final String allocationId, final long primaryTerm, final String message) { this.shardId = shardId; this.allocationId = allocationId; + this.primaryTerm = primaryTerm; this.message = message; } @@ -627,6 +661,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(allocationId); if (out.getVersion().before(Version.V_6_3_0)) { out.writeVLong(0L); + } else if (out.getVersion().onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport + out.writeVLong(primaryTerm); } out.writeString(message); if (out.getVersion().before(Version.V_6_3_0)) { @@ -636,8 +672,8 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], message [%s]}", - shardId, allocationId, message); + return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s]}", + shardId, allocationId, primaryTerm, message); } } diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 80ac05ece8274..0b410af98fe92 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -575,13 +575,14 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR } try { - logger.debug("{} creating shard", shardRouting.shardId()); + final long primaryTerm = state.metaData().index(shardRouting.index()).primaryTerm(shardRouting.id()); + logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm); RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode); indicesService.createShard( shardRouting, recoveryState, recoveryTargetService, - new RecoveryListener(shardRouting), + new RecoveryListener(shardRouting, primaryTerm), repositoriesService, failedShardHandler, globalCheckpointSyncer, @@ -618,25 +619,24 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard .collect(Collectors.toSet()); shard.updateShardState(shardRouting, primaryTerm, primaryReplicaSyncer::resync, clusterState.version(), inSyncIds, indexShardRoutingTable, pre60AllocationIds); - } catch (Exception e) { - failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); - return; - } - final IndexShardState state = shard.state(); - if (shardRouting.initializing() && (state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY)) { - // the master thinks we are initializing, but we are already started or on POST_RECOVERY and waiting - // for master to confirm a shard started message (either master failover, or a cluster event before - // we managed to tell the master we started), mark us as started - if (logger.isTraceEnabled()) { - logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started to {}", - shardRouting.shardId(), state, nodes.getMasterNode()); - } - if (nodes.getMasterNode() != null) { - shardStateAction.shardStarted(shardRouting, "master " + nodes.getMasterNode() + - " marked shard as initializing, but shard state is [" + state + "], mark shard as started", - SHARD_STATE_ACTION_LISTENER, clusterState); + final IndexShardState state = shard.state(); + if (shardRouting.initializing() && (state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY)) { + // the master thinks we are initializing, but we are already started or on POST_RECOVERY and waiting + // for master to confirm a shard started message (either master failover, or a cluster event before + // we managed to tell the master we started), mark us as started + if (logger.isTraceEnabled()) { + logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started to {}", + shardRouting.shardId(), state, nodes.getMasterNode()); + } + if (nodes.getMasterNode() != null) { + shardStateAction.shardStarted(shardRouting, primaryTerm, "master " + nodes.getMasterNode() + + " marked shard as initializing, but shard state is [" + state + "], mark shard as started", + SHARD_STATE_ACTION_LISTENER, clusterState); + } } + } catch (Exception e) { + failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); } } @@ -674,14 +674,16 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, Routin private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener { private final ShardRouting shardRouting; + private final long primaryTerm; - private RecoveryListener(ShardRouting shardRouting) { + private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) { this.shardRouting = shardRouting; + this.primaryTerm = primaryTerm; } @Override - public void onRecoveryDone(RecoveryState state) { - shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); + public void onRecoveryDone(final RecoveryState state) { + shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index 1d3a523cdc94f..f3a9594999412 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -69,7 +69,7 @@ public void testEmptyTaskListProducesSameClusterState() throws Exception { public void testNonExistentIndexMarkedAsSuccessful() throws Exception { final ClusterState clusterState = stateWithNoShard(); - final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", "test"); + final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", randomNonNegativeLong(), "test"); assertTasksExecution(clusterState, singletonList(entry), result -> { assertSame(clusterState, result.resultingState); @@ -87,10 +87,10 @@ public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { final List tasks = Stream.concat( // Existent shard id but different allocation id IntStream.range(0, randomIntBetween(1, 5)) - .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), 0), String.valueOf(i), "allocation id")), + .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), 0), String.valueOf(i), 0L, "allocation id")), // Non existent shard id IntStream.range(1, randomIntBetween(2, 5)) - .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), i), String.valueOf(i), "shard id")) + .mapToObj(i -> new StartedShardEntry(new ShardId(indexMetaData.getIndex(), i), String.valueOf(i), 0L, "shard id")) ).collect(Collectors.toList()); @@ -119,7 +119,8 @@ public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception { } else { allocationId = shardRoutingTable.replicaShards().iterator().next().allocationId().getId(); } - return new StartedShardEntry(shardId, allocationId, "test"); + final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); + return new StartedShardEntry(shardId, allocationId, primaryTerm, "test"); }).collect(Collectors.toList()); assertTasksExecution(clusterState, tasks, result -> { @@ -138,15 +139,16 @@ public void testStartedShards() throws Exception { final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); final String primaryAllocationId = primaryShard.allocationId().getId(); final List tasks = new ArrayList<>(); - tasks.add(new StartedShardEntry(shardId, primaryAllocationId, "test")); + tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "test")); if (randomBoolean()) { final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); final String replicaAllocationId = replicaShard.allocationId().getId(); - tasks.add(new StartedShardEntry(shardId, replicaAllocationId, "test")); + tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "test")); } assertTasksExecution(clusterState, tasks, result -> { assertNotSame(clusterState, result.resultingState); @@ -169,9 +171,10 @@ public void testDuplicateStartsAreOkay() throws Exception { final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); final ShardRouting shardRouting = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); final String allocationId = shardRouting.allocationId().getId(); + final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); final List tasks = IntStream.range(0, randomIntBetween(2, 10)) - .mapToObj(i -> new StartedShardEntry(shardId, allocationId, "test")) + .mapToObj(i -> new StartedShardEntry(shardId, allocationId, primaryTerm, "test")) .collect(Collectors.toList()); assertTasksExecution(clusterState, tasks, result -> { @@ -187,6 +190,52 @@ public void testDuplicateStartsAreOkay() throws Exception { }); } + public void testPrimaryTermsMismatch() throws Exception { + final String indexName = "test"; + final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); + + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); + final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); + final String primaryAllocationId = primaryShard.allocationId().getId(); + final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); + final String replicaAllocationId = replicaShard.allocationId().getId(); + + final List tasks = new ArrayList<>(); + tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm - 1, "primary terms does not match on primary")); + tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm - 1, "primary terms does not match on replica")); + + assertTasksExecution(clusterState, tasks, result -> { + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(2)); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.INITIALIZING)); + assertSame(clusterState, result.resultingState); + }); + }); + + tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "primary terms match on primary")); + tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "primary terms match on replica")); + + assertTasksExecution(clusterState, tasks, result -> { + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(4)); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); + assertNotSame(clusterState, result.resultingState); + }); + }); + } + private void assertTasksExecution(final ClusterState state, final List tasks, final Consumer consumer) throws Exception { diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index e94a974ae7a89..a800c0c79929c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -72,12 +72,14 @@ import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; public class ShardStateActionTests extends ESTestCase { @@ -420,8 +422,9 @@ public void testShardStarted() throws InterruptedException { setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); final ShardRouting shardRouting = getRandomShardRouting(index); + final long primaryTerm = clusterService.state().metaData().index(shardRouting.index()).primaryTerm(shardRouting.id()); final TestListener listener = new TestListener(); - shardStateAction.shardStarted(shardRouting, "testShardStarted", listener); + shardStateAction.shardStarted(shardRouting, primaryTerm, "testShardStarted", listener); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests[0].request, instanceOf(ShardStateAction.StartedShardEntry.class)); @@ -429,6 +432,7 @@ public void testShardStarted() throws InterruptedException { ShardStateAction.StartedShardEntry entry = (ShardStateAction.StartedShardEntry) capturedRequests[0].request; assertThat(entry.shardId, equalTo(shardRouting.shardId())); assertThat(entry.allocationId, equalTo(shardRouting.allocationId().getId())); + assertThat(entry.primaryTerm, equalTo(primaryTerm)); transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); listener.await(); @@ -481,7 +485,7 @@ public void testShardEntryBWCSerialize() throws Exception { final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); final String reason = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); - try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, reason), bwcVersion).streamInput()) { + try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, 0L, reason), bwcVersion).streamInput()) { in.setVersion(bwcVersion); final FailedShardEntry failedShardEntry = new FailedShardEntry(in); assertThat(failedShardEntry.shardId, equalTo(shardId)); @@ -490,8 +494,7 @@ public void testShardEntryBWCSerialize() throws Exception { assertThat(failedShardEntry.failure, nullValue()); assertThat(failedShardEntry.markAsStale, equalTo(true)); } - try (StreamInput in = serialize(new FailedShardEntry(shardId, allocationId, 0L, - reason, null, false), bwcVersion).streamInput()) { + try (StreamInput in = serialize(new FailedShardEntry(shardId, allocationId, 0L, reason, null, false), bwcVersion).streamInput()) { in.setVersion(bwcVersion); final StartedShardEntry startedShardEntry = new StartedShardEntry(in); assertThat(startedShardEntry.shardId, equalTo(shardId)); @@ -500,6 +503,56 @@ public void testShardEntryBWCSerialize() throws Exception { } } + public void testFailedShardEntrySerialization() throws Exception { + final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); + final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + final long primaryTerm = randomIntBetween(0, 100); + final String message = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + final Exception failure = randomBoolean() ? null : getSimulatedFailure(); + final boolean markAsStale = randomBoolean(); + + final Version version = randomFrom(randomCompatibleVersion(random(), Version.CURRENT)); + final FailedShardEntry failedShardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale); + try (StreamInput in = serialize(failedShardEntry, version).streamInput()) { + in.setVersion(version); + final FailedShardEntry deserialized = new FailedShardEntry(in); + assertThat(deserialized.shardId, equalTo(shardId)); + assertThat(deserialized.allocationId, equalTo(allocationId)); + assertThat(deserialized.primaryTerm, equalTo(primaryTerm)); + assertThat(deserialized.message, equalTo(message)); + if (failure != null) { + assertThat(deserialized.failure, notNullValue()); + assertThat(deserialized.failure.getClass(), equalTo(failure.getClass())); + assertThat(deserialized.failure.getMessage(), equalTo(failure.getMessage())); + } else { + assertThat(deserialized.failure, nullValue()); + } + assertThat(deserialized.markAsStale, equalTo(markAsStale)); + assertEquals(failedShardEntry, deserialized); + } + } + + public void testStartedShardEntrySerialization() throws Exception { + final ShardId shardId = new ShardId(randomRealisticUnicodeOfLengthBetween(10, 100), UUID.randomUUID().toString(), between(0, 1000)); + final String allocationId = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + final long primaryTerm = randomIntBetween(0, 100); + final String message = randomRealisticUnicodeOfCodepointLengthBetween(10, 100); + + final Version version = randomFrom(randomCompatibleVersion(random(), Version.CURRENT)); + try (StreamInput in = serialize(new StartedShardEntry(shardId, allocationId, primaryTerm, message), version).streamInput()) { + in.setVersion(version); + final StartedShardEntry deserialized = new StartedShardEntry(in); + assertThat(deserialized.shardId, equalTo(shardId)); + assertThat(deserialized.allocationId, equalTo(allocationId)); + if (version.onOrAfter(Version.V_7_0_0)) { // TODO update version to 6.7.0 after backport + assertThat(deserialized.primaryTerm, equalTo(primaryTerm)); + } else { + assertThat(deserialized.primaryTerm, equalTo(0L)); + } + assertThat(deserialized.message, equalTo(message)); + } + } + BytesReference serialize(Writeable writeable, Version version) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { out.setVersion(version); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 8a00be28f5eb2..a66ef094000ee 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -277,9 +277,12 @@ public ClusterState applyFailedShards(ClusterState clusterState, List startedShards) { - List entries = startedShards.stream().map(startedShard -> - new StartedShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), "shard started")) - .collect(Collectors.toList()); + List entries = startedShards.stream() + .map(startedShard -> { + final IndexMetaData indexMetaData = clusterState.metaData().index(startedShard.shardId().getIndex()); + final long primaryTerm = indexMetaData != null ? indexMetaData.primaryTerm(startedShard.shardId().id()) : 0L; + return new StartedShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), primaryTerm, "shard started"); + }).collect(Collectors.toList()); return runTasks(shardStartedClusterStateTaskExecutor, clusterState, entries); } From ebfa5b5ed8ea589330b993f01e251b6d54c912ab Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 28 Jan 2019 10:56:48 +0100 Subject: [PATCH 2/4] Apply feedback --- .../action/shard/ShardStateAction.java | 23 +++++----- .../cluster/IndicesClusterStateService.java | 43 +++++++++++-------- ...actIndicesClusterStateServiceTestCase.java | 9 +++- .../indices/cluster/ClusterStateChanges.java | 17 +++++--- ...ClusterStateServiceRandomUpdatesTests.java | 4 +- 5 files changed, 58 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 67ff1b4f4ae0e..a7677bfb15f24 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -558,8 +558,18 @@ public ClusterTasksResult execute(ClusterState currentState, logger.debug("{} ignoring shard started task [{}] (unknown index {})", task.shardId, task, task.shardId.getIndex()); builder.success(task); continue; + } + + final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); + if (matched == null) { + // tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started + // events on every cluster state publishing that does not contain the shard as started yet. This means that old stale + // requests might still be in flight even after the shard has already been started or failed on the master. We just + // ignore these requests for now. + logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task); + builder.success(task); } else { - if (task.primaryTerm > 0) { + if (matched.primary() && task.primaryTerm > 0) { final long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id()); if (currentPrimaryTerm != task.primaryTerm) { assert currentPrimaryTerm > task.primaryTerm : "received a primary term with a higher term than in the " + @@ -570,17 +580,6 @@ public ClusterTasksResult execute(ClusterState currentState, continue; } } - } - - ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); - if (matched == null) { - // tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started - // events on every cluster state publishing that does not contain the shard as started yet. This means that old stale - // requests might still be in flight even after the shard has already been started or failed on the master. We just - // ignore these requests for now. - logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task); - builder.success(task); - } else { if (matched.initializing() == false) { assert matched.active() : "expected active shard routing for task " + task + " but found " + matched; // same as above, this might have been a stale in-flight request, so we just ignore. diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 0b410af98fe92..5955a749fea34 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -599,9 +599,10 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard "local shard has a different allocation id but wasn't cleaning by removeShards. " + "cluster state: " + shardRouting + " local: " + currentRoutingEntry; + final long primaryTerm; try { final IndexMetaData indexMetaData = clusterState.metaData().index(shard.shardId().getIndex()); - final long primaryTerm = indexMetaData.primaryTerm(shard.shardId().id()); + primaryTerm = indexMetaData.primaryTerm(shard.shardId().id()); final Set inSyncIds = indexMetaData.inSyncAllocationIds(shard.shardId().id()); final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); final Set pre60AllocationIds = indexShardRoutingTable.assignedShards() @@ -619,24 +620,25 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard .collect(Collectors.toSet()); shard.updateShardState(shardRouting, primaryTerm, primaryReplicaSyncer::resync, clusterState.version(), inSyncIds, indexShardRoutingTable, pre60AllocationIds); - - final IndexShardState state = shard.state(); - if (shardRouting.initializing() && (state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY)) { - // the master thinks we are initializing, but we are already started or on POST_RECOVERY and waiting - // for master to confirm a shard started message (either master failover, or a cluster event before - // we managed to tell the master we started), mark us as started - if (logger.isTraceEnabled()) { - logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started to {}", - shardRouting.shardId(), state, nodes.getMasterNode()); - } - if (nodes.getMasterNode() != null) { - shardStateAction.shardStarted(shardRouting, primaryTerm, "master " + nodes.getMasterNode() + - " marked shard as initializing, but shard state is [" + state + "], mark shard as started", - SHARD_STATE_ACTION_LISTENER, clusterState); - } - } } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState); + return; + } + + final IndexShardState state = shard.state(); + if (shardRouting.initializing() && (state == IndexShardState.STARTED || state == IndexShardState.POST_RECOVERY)) { + // the master thinks we are initializing, but we are already started or on POST_RECOVERY and waiting + // for master to confirm a shard started message (either master failover, or a cluster event before + // we managed to tell the master we started), mark us as started + if (logger.isTraceEnabled()) { + logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started to {}", + shardRouting.shardId(), state, nodes.getMasterNode()); + } + if (nodes.getMasterNode() != null) { + shardStateAction.shardStarted(shardRouting, primaryTerm, "master " + nodes.getMasterNode() + + " marked shard as initializing, but shard state is [" + state + "], mark shard as started", + SHARD_STATE_ACTION_LISTENER, clusterState); + } } } @@ -673,7 +675,14 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, Routin private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener { + /** + * ShardRouting with which the shard was created + */ private final ShardRouting shardRouting; + + /** + * Primary term with which the shard was created + */ private final long primaryTerm; private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) { diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index f248d46b11744..9b6cae43081ad 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -361,11 +361,14 @@ public void updateShardState(ShardRouting shardRouting, assertThat(this.shardId(), equalTo(shardRouting.shardId())); assertTrue("current: " + this.shardRouting + ", got: " + shardRouting, this.shardRouting.isSameAllocation(shardRouting)); if (this.shardRouting.active()) { - assertTrue("and active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting, + assertTrue("an active shard must stay active, current: " + this.shardRouting + ", got: " + shardRouting, shardRouting.active()); } if (this.shardRouting.primary()) { assertTrue("a primary shard can't be demoted", shardRouting.primary()); + if (this.shardRouting.initializing()) { + assertEquals("primary term can not be updated on an initializing primary shard: " + shardRouting, term, newPrimaryTerm); + } } else if (shardRouting.primary()) { // note: it's ok for a replica in post recovery to be started and promoted at once // this can happen when the primary failed after we sent the start shard message @@ -390,6 +393,10 @@ public IndexShardState state() { return null; } + public long term() { + return term; + } + public void updateTerm(long newTerm) { assertThat("term can only be incremented: " + shardRouting, newTerm, greaterThanOrEqualTo(term)); if (shardRouting.primary() && shardRouting.active()) { diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index a66ef094000ee..c1e32be9d29af 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -277,13 +277,18 @@ public ClusterState applyFailedShards(ClusterState clusterState, List startedShards) { - List entries = startedShards.stream() - .map(startedShard -> { + final Map entries = startedShards.stream() + .collect(Collectors.toMap(Function.identity(), startedShard -> { final IndexMetaData indexMetaData = clusterState.metaData().index(startedShard.shardId().getIndex()); - final long primaryTerm = indexMetaData != null ? indexMetaData.primaryTerm(startedShard.shardId().id()) : 0L; - return new StartedShardEntry(startedShard.shardId(), startedShard.allocationId().getId(), primaryTerm, "shard started"); - }).collect(Collectors.toList()); - return runTasks(shardStartedClusterStateTaskExecutor, clusterState, entries); + return indexMetaData != null ? indexMetaData.primaryTerm(startedShard.shardId().id()) : 0L; + })); + return applyStartedShards(clusterState, entries); + } + + public ClusterState applyStartedShards(ClusterState clusterState, Map startedShards) { + return runTasks(shardStartedClusterStateTaskExecutor, clusterState, startedShards.entrySet().stream() + .map(e -> new StartedShardEntry(e.getKey().shardId(), e.getKey().allocationId().getId(), e.getValue(), "shard started")) + .collect(Collectors.toList())); } private ClusterState runTasks(ClusterStateTaskExecutor executor, ClusterState clusterState, List entries) { diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index b400b56b34d55..e664cc87452fc 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -384,7 +384,7 @@ public ClusterState randomlyUpdateClusterState(ClusterState state, } // randomly start and fail allocated shards - List startedShards = new ArrayList<>(); + final Map startedShards = new HashMap<>(); List failedShards = new ArrayList<>(); for (DiscoveryNode node : state.nodes()) { IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node); @@ -393,7 +393,7 @@ public ClusterState randomlyUpdateClusterState(ClusterState state, for (MockIndexShard indexShard : indexService) { ShardRouting persistedShardRouting = indexShard.routingEntry(); if (persistedShardRouting.initializing() && randomBoolean()) { - startedShards.add(persistedShardRouting); + startedShards.put(persistedShardRouting, indexShard.term()); } else if (rarely()) { failedShards.add(new FailedShard(persistedShardRouting, "fake shard failure", new Exception(), randomBoolean())); } From 8c19deaf765aa24343f239867ce15002de4f0d91 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 28 Jan 2019 14:35:36 +0100 Subject: [PATCH 3/4] Adapt test --- ...dStartedClusterStateTaskExecutorTests.java | 162 +++++++++--------- 1 file changed, 82 insertions(+), 80 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index f3a9594999412..fe36b1588b5cc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -34,7 +34,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -64,19 +63,19 @@ public void setUp() throws Exception { public void testEmptyTaskListProducesSameClusterState() throws Exception { final ClusterState clusterState = stateWithNoShard(); - assertTasksExecution(clusterState, Collections.emptyList(), result -> assertSame(clusterState, result.resultingState)); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, Collections.emptyList()); + assertSame(clusterState, result.resultingState); } public void testNonExistentIndexMarkedAsSuccessful() throws Exception { final ClusterState clusterState = stateWithNoShard(); final StartedShardEntry entry = new StartedShardEntry(new ShardId("test", "_na", 0), "aId", randomNonNegativeLong(), "test"); - assertTasksExecution(clusterState, singletonList(entry), - result -> { - assertSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(1)); - assertThat(result.executionResults.containsKey(entry), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(entry)).isSuccess(), is(true)); - }); + + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(entry)); + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(1)); + assertThat(result.executionResults.containsKey(entry), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(entry)).isSuccess(), is(true)); } public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { @@ -94,13 +93,12 @@ public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { ).collect(Collectors.toList()); - assertTasksExecution(clusterState, tasks, result -> { - assertSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(tasks.size())); - tasks.forEach(task -> { - assertThat(result.executionResults.containsKey(task), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); - }); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); }); } @@ -123,13 +121,12 @@ public void testNonInitializingShardAreMarkedAsSuccessful() throws Exception { return new StartedShardEntry(shardId, allocationId, primaryTerm, "test"); }).collect(Collectors.toList()); - assertTasksExecution(clusterState, tasks, result -> { - assertSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(tasks.size())); - tasks.forEach(task -> { - assertThat(result.executionResults.containsKey(task), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); - }); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); + assertSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); }); } @@ -150,16 +147,15 @@ public void testStartedShards() throws Exception { final String replicaAllocationId = replicaShard.allocationId().getId(); tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "test")); } - assertTasksExecution(clusterState, tasks, result -> { - assertNotSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(tasks.size())); - tasks.forEach(task -> { - assertThat(result.executionResults.containsKey(task), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); - - final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); - assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); - }); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); }); } @@ -177,70 +173,76 @@ public void testDuplicateStartsAreOkay() throws Exception { .mapToObj(i -> new StartedShardEntry(shardId, allocationId, primaryTerm, "test")) .collect(Collectors.toList()); - assertTasksExecution(clusterState, tasks, result -> { - assertNotSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(tasks.size())); - tasks.forEach(task -> { - assertThat(result.executionResults.containsKey(task), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); - - final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); - assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); - }); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, tasks); + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(tasks.size())); + tasks.forEach(task -> { + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + + final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); }); } public void testPrimaryTermsMismatch() throws Exception { final String indexName = "test"; - final ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); - + ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); - final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard(); - final String primaryAllocationId = primaryShard.allocationId().getId(); - final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next(); - final String replicaAllocationId = replicaShard.allocationId().getId(); + final String primaryAllocationId = clusterState.routingTable().shardRoutingTable(shardId).primaryShard().allocationId().getId(); + { - final List tasks = new ArrayList<>(); - tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm - 1, "primary terms does not match on primary")); - tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm - 1, "primary terms does not match on replica")); + final StartedShardEntry task = + new StartedShardEntry(shardId, primaryAllocationId, primaryTerm -1, "primary terms does not match on primary"); - assertTasksExecution(clusterState, tasks, result -> { + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); assertSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(2)); - tasks.forEach(task -> { - assertThat(result.executionResults.containsKey(task), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); - - final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); - assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.INITIALIZING)); - assertSame(clusterState, result.resultingState); - }); - }); + assertThat(result.executionResults.size(), equalTo(1)); + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.INITIALIZING)); + assertSame(clusterState, result.resultingState); + } + { + final StartedShardEntry task = + new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "primary terms match on primary"); - tasks.add(new StartedShardEntry(shardId, primaryAllocationId, primaryTerm, "primary terms match on primary")); - tasks.add(new StartedShardEntry(shardId, replicaAllocationId, primaryTerm, "primary terms match on replica")); + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); + assertNotSame(clusterState, result.resultingState); + assertThat(result.executionResults.size(), equalTo(1)); + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); + assertNotSame(clusterState, result.resultingState); + clusterState = result.resultingState; + } + { + final long replicaPrimaryTerm = randomBoolean() ? primaryTerm : primaryTerm - 1; + final String replicaAllocationId = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().iterator().next() + .allocationId().getId(); + + final StartedShardEntry task = new StartedShardEntry(shardId, replicaAllocationId, replicaPrimaryTerm, "test on replica"); - assertTasksExecution(clusterState, tasks, result -> { + final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); assertNotSame(clusterState, result.resultingState); - assertThat(result.executionResults.size(), equalTo(4)); - tasks.forEach(task -> { - assertThat(result.executionResults.containsKey(task), is(true)); - assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); - - final IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); - assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); - assertNotSame(clusterState, result.resultingState); - }); - }); + assertThat(result.executionResults.size(), equalTo(1)); + assertThat(result.executionResults.containsKey(task), is(true)); + assertThat(((ClusterStateTaskExecutor.TaskResult) result.executionResults.get(task)).isSuccess(), is(true)); + IndexShardRoutingTable shardRoutingTable = result.resultingState.routingTable().shardRoutingTable(task.shardId); + assertThat(shardRoutingTable.getByAllocationId(task.allocationId).state(), is(ShardRoutingState.STARTED)); + assertNotSame(clusterState, result.resultingState); + } } - private void assertTasksExecution(final ClusterState state, - final List tasks, - final Consumer consumer) throws Exception { + private ClusterStateTaskExecutor.ClusterTasksResult executeTasks(final ClusterState state, + final List tasks) throws Exception { final ClusterStateTaskExecutor.ClusterTasksResult result = executor.execute(state, tasks); assertThat(result, notNullValue()); - consumer.accept(result); + return result; } } From 2d39807654f9183c06b0befd925494d5313dfc47 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 28 Jan 2019 17:50:59 +0100 Subject: [PATCH 4/4] Move IndexMetadata lookup and fix test when primary term == 1 --- .../action/shard/ShardStateAction.java | 10 ++-------- ...dStartedClusterStateTaskExecutorTests.java | 19 +++++++++++++------ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index a7677bfb15f24..4419d921a3b4a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -552,14 +552,6 @@ public ClusterTasksResult execute(ClusterState currentState, List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); Set seenShardRoutings = new HashSet<>(); // to prevent duplicates for (StartedShardEntry task : tasks) { - final IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex()); - if (indexMetaData == null) { - // tasks that correspond to non-existent indices are marked as successful - logger.debug("{} ignoring shard started task [{}] (unknown index {})", task.shardId, task, task.shardId.getIndex()); - builder.success(task); - continue; - } - final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId); if (matched == null) { // tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started @@ -570,6 +562,8 @@ public ClusterTasksResult execute(ClusterState currentState, builder.success(task); } else { if (matched.primary() && task.primaryTerm > 0) { + final IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex()); + assert indexMetaData != null; final long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id()); if (currentPrimaryTerm != task.primaryTerm) { assert currentPrimaryTerm > task.primaryTerm : "received a primary term with a higher term than in the " + diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index fe36b1588b5cc..20b7548004f4a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -187,16 +188,22 @@ public void testDuplicateStartsAreOkay() throws Exception { public void testPrimaryTermsMismatch() throws Exception { final String indexName = "test"; - ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); - final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); - final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final int shard = 0; + final int primaryTerm = 2 + randomInt(200); - final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); + ClusterState clusterState = state(indexName, randomBoolean(), ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING); + clusterState = ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .put(IndexMetaData.builder(clusterState.metaData().index(indexName)) + .primaryTerm(shard, primaryTerm) + .build(), true) + .build()) + .build(); + final ShardId shardId = new ShardId(clusterState.metaData().index(indexName).getIndex(), shard); final String primaryAllocationId = clusterState.routingTable().shardRoutingTable(shardId).primaryShard().allocationId().getId(); { - final StartedShardEntry task = - new StartedShardEntry(shardId, primaryAllocationId, primaryTerm -1, "primary terms does not match on primary"); + new StartedShardEntry(shardId, primaryAllocationId, primaryTerm - 1, "primary terms does not match on primary"); final ClusterStateTaskExecutor.ClusterTasksResult result = executeTasks(clusterState, singletonList(task)); assertSame(clusterState, result.resultingState);