From 62ce29b757a4da008bf13b4051f208940b84534e Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Thu, 27 Sep 2018 17:50:27 +0200 Subject: [PATCH 01/25] Put a fake allocation id on allocate stale primary command; remove it after recovery is done Relates to #33432 --- .../cluster/routing/IndexRoutingTable.java | 3 ++- .../cluster/routing/RecoverySource.java | 1 + .../allocation/IndexMetaDataUpdater.java | 18 +++++++++++++++--- .../cluster/routing/PrimaryAllocationIT.java | 10 +++++++++- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index e9d805d34c8a1..c3eb217003e60 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -141,7 +141,8 @@ boolean validate(MetaData metaData) { if (shardRouting.primary() && shardRouting.initializing() && shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE && - inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) + inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false && + inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false) throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " + "a known allocation id but has no corresponding entry in the in-sync " + "allocation set " + inSyncAllocationIds); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index b7cc95298c49e..09be960dff20b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -132,6 +132,7 @@ public String toString() { * Recovery from an existing on-disk store */ public static final class ExistingStoreRecoverySource extends RecoverySource { + public static final String FORCED_ALLOCATION_ID = "_forced_allocation"; public static final ExistingStoreRecoverySource INSTANCE = new ExistingStoreRecoverySource(false); public static final ExistingStoreRecoverySource FORCE_STALE_PRIMARY_INSTANCE = new ExistingStoreRecoverySource(true); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index 6d4ca7dc77524..8de8e4460bca8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -69,6 +69,13 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali @Override public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { addAllocationId(startedShard); + if (startedShard.primary() + // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state + && (initializingShard.recoverySource() instanceof RecoverySource.ExistingStoreRecoverySource + || initializingShard.recoverySource() instanceof RecoverySource.SnapshotRecoverySource)) { + Updates updates = changes(startedShard.shardId()); + updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID); + } } @Override @@ -144,7 +151,8 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab oldInSyncAllocationIds.contains(updates.initializedPrimary.allocationId().getId()) == false) { // we're not reusing an existing in-sync allocation id to initialize a primary, which means that we're either force-allocating // an empty or a stale primary (see AllocateEmptyPrimaryAllocationCommand or AllocateStalePrimaryAllocationCommand). - RecoverySource.Type recoverySourceType = updates.initializedPrimary.recoverySource().getType(); + RecoverySource recoverySource = updates.initializedPrimary.recoverySource(); + RecoverySource.Type recoverySourceType = recoverySource.getType(); boolean emptyPrimary = recoverySourceType == RecoverySource.Type.EMPTY_STORE; assert updates.addedAllocationIds.isEmpty() : (emptyPrimary ? "empty" : "stale") + " primary is not force-initialized in same allocation round where shards are started"; @@ -156,9 +164,12 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab // forcing an empty primary resets the in-sync allocations to the empty set (ShardRouting.allocatedPostIndexCreate) indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.emptySet()); } else { + assert recoverySource instanceof RecoverySource.ExistingStoreRecoverySource + || recoverySource instanceof RecoverySource.SnapshotRecoverySource + : recoverySource; // forcing a stale primary resets the in-sync allocations to the singleton set with the stale id indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), - Collections.singleton(updates.initializedPrimary.allocationId().getId())); + Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)); } } else { // standard path for updating in-sync ids @@ -291,7 +302,8 @@ void removeAllocationId(ShardRouting shardRouting) { * Add allocation id of this shard to the set of in-sync shard copies */ private void addAllocationId(ShardRouting shardRouting) { - changes(shardRouting.shardId()).addedAllocationIds.add(shardRouting.allocationId().getId()); + final Updates changes = changes(shardRouting.shardId()); + changes.addedAllocationIds.add(shardRouting.allocationId().getId()); } /** diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 9b2db5b34b1da..4f789a95b1a8e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -204,6 +204,13 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { } rerouteBuilder.get(); + ClusterState state = client().admin().cluster().prepareState().get().getState(); + + Set expectedAllocationIds = useStaleReplica + ? Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) + : Collections.emptySet(); + assertEquals(expectedAllocationIds, state.metaData().index(idxName).inSyncAllocationIds(0)); + logger.info("--> check that the stale primary shard gets allocated and that documents are available"); ensureYellow(idxName); @@ -218,7 +225,8 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); // allocation id of old primary was cleaned from the in-sync set - ClusterState state = client().admin().cluster().prepareState().get().getState(); + state = client().admin().cluster().prepareState().get().getState(); + assertEquals(Collections.singleton(state.routingTable().index(idxName).shard(0).primary.allocationId().getId()), state.metaData().index(idxName).inSyncAllocationIds(0)); From 3185235badcc6ae62c4d55086cd7f94fb25a0ec4 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Sat, 29 Sep 2018 21:16:06 +0200 Subject: [PATCH 02/25] added a test case to spot the problem if allocation id is adjusted before recovery is done (historyUUID is adjusted): --- .../cluster/routing/AllocationIdIT.java | 341 ++++++++++++++++++ .../RemoveCorruptedShardDataCommandIT.java | 15 +- 2 files changed, 351 insertions(+), 5 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java new file mode 100644 index 0000000000000..b0e84cb54cb94 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -0,0 +1,341 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.SimpleFSDirectory; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.cli.MockTerminal; +import org.elasticsearch.cli.Terminal; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.AllocationDecision; +import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; +import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.index.MockEngineFactoryPlugin; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommand; +import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommandIT; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.CorruptionUtils; +import org.elasticsearch.test.DummyShardLock; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.engine.MockEngineSupport; +import org.elasticsearch.test.transport.MockTransportService; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isIn; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) +public class AllocationIdIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class); + } + + public void testAllocationIdIsSetAfterRecoveryOnAllocateStalePrimary() throws Exception { + /** + * test case to spot the problem if allocation id is adjusted before recovery is done (historyUUID is adjusted): + * + * - start node1 (primary), index some docs, start node2 (replica) all docs are replicated; same historyUUID + * - stop node2 (replica) + * - corrupt index on node1, drop corrupted parts it with a corruption tool but don't change historyUUID + * (if we start both node1 and node2 no any recovery would be done due to same historyUUID while node1 has less docs than node2) + * after corruption tool it is required AllocateStalePrimary to be called. + * + * we'd like to disrupt the recovery (local) process: + * - put a fake corruption marker on node1 to disrupt the recovery + * - start node - allocation id is adjusted (*) - recovery failed and historyUUID is not changed + * - remove a fake corruption marker from node1 and start it again + * - the interruption on recovery has the effect close to AllocateStalePrimary is ignored if a real allocation + * id in (*) is persisted before recovery is done (and historyUUID is changed) + * - index is RED as shard allocation id does not match persisted at (*) allocation id (_forced_allocation) + * - AllocateStalePrimary has to be called again + * it fails if a shard allocation id at (*) is persisted + * - index turns to YELLOW + * - start replica -> after (a full) recovery index turns GREEN and all shards have the same number of docs ( numDocs - corrupted ) + * no recovery take place if a shard allocation id at (*) is persisted => + * => nodes are fully in-sync but have diff number of docs + */ + + // initial set up + final String indexName = "index42"; + final String node1 = internalCluster().startNode(); + final int numDocs = indexDocs(indexName); + final IndexSettings indexSettings = getIndexSettings(indexName); + final String primaryNodeId = getNodeIdByName(node1); + final Set allocationIds = getAllocationIds(indexName); + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final Path indexPath = getIndexPath(node1, shardId); + assertThat(allocationIds, hasSize(1)); + final Set historyUUIDs = historyUUIDs(node1, indexName); + final String node2 = internalCluster().startNode(); + ensureGreen(indexName); + assertSameDocIdsOnShards(); + // initial set up is done + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2)); + + final int expectedNumDocs = corruptIndexAndGetExpectedNumDocs(node1, indexName, numDocs, shardId, indexPath); + + // start nodes in the same order to pick up same data folders; same as node1 + final String node3 = internalCluster().startNode(); + + // there is only _stale_ primary (due to new allocation id) + checkNoValidShardCopy(indexName, shardId); + + // create fake corrupted marker + putFakeCorruptionMarker(indexSettings, shardId, indexPath); + + // allocate stale primary + client().admin().cluster().prepareReroute() + .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, primaryNodeId, true)) + .get(); + + // no valid shard copy as shard failed due to corruption marker + checkNoValidShardCopy(indexName, shardId); + + // restart node1 and remove fake corruption marker to be able to start shard + internalCluster().restartNode(node3, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) { + store.removeCorruptionMarker(); + } + return super.onNodeStopped(nodeName); + } + }); + + // check that allocation id is changed + assertBusy(() -> { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + final MetaData indexMetaData = clusterState.metaData(); + final IndexMetaData index = indexMetaData.index(indexName); + assertThat(index, notNullValue()); + final Set newAllocationIds = index.inSyncAllocationIds(0); + assertThat(newAllocationIds, hasSize(1)); + assertThat(newAllocationIds, everyItem(not(isIn(allocationIds)))); + }); + + // that we can have in case of put a real (not a fake) allocation id: + // + // ensureYellow(indexName); + // Set newHistoryUUIds = historyUUIDs(indexName); + // assertThat(newHistoryUUIds, equalTo(historyUUIDs)); + + // index has to red: no any shard is allocated (allocation id is a fake id that does not match to anything) + final ClusterHealthStatus indexHealthStatus = client().admin().cluster() + .health(Requests.clusterHealthRequest(indexName)).actionGet().getStatus(); + assertThat(indexHealthStatus, is(ClusterHealthStatus.RED)); + + // no any valid shard is there; have to invoke AllocateStalePrimary again + client().admin().cluster().prepareReroute() + .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, primaryNodeId, true)) + .get(); + + ensureYellow(indexName); + + // node4 uses same data folder as node2 + final String node4 = internalCluster().startNode(); + + // wait for the replica is fully recovered + ensureGreen(indexName); + + // check that historyUUID is changed + assertThat(historyUUIDs(node3, indexName), everyItem(not(isIn(historyUUIDs)))); + assertThat(historyUUIDs(node4, indexName), everyItem(not(isIn(historyUUIDs)))); + + // both primary and replica have the same number of docs + assertHitCount(client(node3).prepareSearch(indexName).setQuery(matchAllQuery()).get(), expectedNumDocs); + // otherwise - replica has not been recovered and it has more docs than primary + assertHitCount(client(node4).prepareSearch(indexName).setQuery(matchAllQuery()).get(), expectedNumDocs); + assertSameDocIdsOnShards(); + } + + private int indexDocs(String indexName) throws InterruptedException, ExecutionException { + assertAcked(prepareCreate(indexName) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1") + .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum"))); + // index some docs in several segments + int numDocs = 0; + for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) { + final int numExtraDocs = between(10, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar"); + } + + numDocs += numExtraDocs; + + indexRandom(false, false, false, Arrays.asList(builders)); + flush(indexName); + } + + return numDocs; + } + + private String getNodeIdByName(String nodeName) { + String primaryNodeId = null; + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + final DiscoveryNodes nodes = state.nodes(); + for (ObjectObjectCursor cursor : nodes.getNodes()) { + final String name = cursor.value.getName(); + if (name.equals(nodeName)) { + primaryNodeId = cursor.key; + break; + } + } + assertThat(primaryNodeId, notNullValue()); + return primaryNodeId; + } + + private int corruptIndexAndGetExpectedNumDocs(String nodeName, String indexName, int numDocs, + ShardId shardId, Path indexPath) throws Exception { + // corrupt index on node restart + internalCluster().restartNode(nodeName, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String node) throws Exception { + CorruptionUtils.corruptIndex(random(), indexPath, false); + return super.onNodeStopped(node); + } + }); + + // all shards should be failed due to a corrupted index + assertBusy(() -> { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(shardId.id()).setPrimary(true) + .get().getExplanation(); + + final UnassignedInfo unassignedInfo = explanation.getUnassignedInfo(); + assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); + }); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeName)); + + // drop corrupted documents from index, but keep the same historyUUID + final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand() { + @Override + protected void addNewHistoryCommit(Directory indexDirectory, Terminal terminal11, boolean updateLocalCheckpoint) { + // do not create a new history commit + } + }; + + final OptionParser parser = command.getParser(); + final Environment environment = TestEnvironment.newEnvironment(internalCluster().getDefaultSettings()); + final OptionSet options = parser.parse("-index", indexName, "-shard-id", Integer.toString(shardId.id())); + final MockTerminal terminal = new MockTerminal(); + terminal.addTextInput("y"); + command.execute(terminal, options, environment); + // grab number of corrupted docs from RemoveCorruptedShardDataCommand output + return RemoveCorruptedShardDataCommandIT.getExpectedNumDocs(numDocs, terminal); + } + + private Path getIndexPath(String nodeName, ShardId shardId) { + final Set indexDirs = RemoveCorruptedShardDataCommandIT.getDirs(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME); + assertThat(indexDirs, hasSize(1)); + return indexDirs.iterator().next(); + } + + private Set getAllocationIds(String indexName) { + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + final Set allocationIds = state.metaData().index(indexName).inSyncAllocationIds(0); + return allocationIds; + } + + private IndexSettings getIndexSettings(String indexName) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class); + final IndexService indexService = indicesService.indexService(resolveIndex(indexName)); + return indexService.getIndexSettings(); + } + + private Set historyUUIDs(String node, String indexName) { + final ShardStats[] shards = client(node).admin().indices().prepareStats(indexName).clear().get().getShards(); + assertThat(shards.length, greaterThan(0)); + return Arrays.stream(shards).map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY)) + .collect(Collectors.toSet()); + } + + private void putFakeCorruptionMarker(IndexSettings indexSettings, ShardId shardId, Path indexPath) throws IOException { + try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) { + store.markStoreCorrupted(new IOException("fake ioexception")); + } + } + + private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception { + assertBusy(() -> { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(shardId.id()).setPrimary(true) + .get().getExplanation(); + + final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); + assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); + assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), + equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); + }); + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java index dc3be31734d5c..30a1831e3a579 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java @@ -74,6 +74,7 @@ import org.elasticsearch.test.transport.MockTransportService; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; @@ -241,16 +242,20 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertThat(explanation.getShardState(), equalTo(ShardRoutingState.STARTED)); }); - final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?\\d+) documents will be lost."); - final Matcher matcher = pattern.matcher(terminal.getOutput()); - assertThat(matcher.find(), equalTo(true)); - final int expectedNumDocs = numDocs - Integer.parseInt(matcher.group("docs")); + final int expectedNumDocs = getExpectedNumDocs(numDocs, terminal); ensureGreen(indexName); assertHitCount(client().prepareSearch(indexName).setQuery(matchAllQuery()).get(), expectedNumDocs); } + public static int getExpectedNumDocs(int numDocs, MockTerminal terminal) throws UnsupportedEncodingException { + final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?\\d+) documents will be lost."); + final Matcher matcher = pattern.matcher(terminal.getOutput()); + assertThat(matcher.find(), equalTo(true)); + return numDocs - Integer.parseInt(matcher.group("docs")); + } + public void testCorruptTranslogTruncation() throws Exception { internalCluster().startNodes(2, Settings.EMPTY); @@ -612,7 +617,7 @@ private Set getDirs(String indexName, String dirSuffix) { return getDirs(nodeId, shardId, dirSuffix); } - private Set getDirs(String nodeId, ShardId shardId, String dirSuffix) { + public static Set getDirs(String nodeId, ShardId shardId, String dirSuffix) { final NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get(); final Set translogDirs = new TreeSet<>(); final NodeStats nodeStats = nodeStatses.getNodes().get(0); From 02e5e055332fca3a1709408214d7728dddbb1d9b Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 1 Oct 2018 11:59:18 +0200 Subject: [PATCH 03/25] simplify test case to spot the problem if allocation id is adjusted before recovery is done (historyUUID is adjusted) --- .../cluster/routing/AllocationIdIT.java | 201 +++++++----------- .../RemoveCorruptedShardDataCommandIT.java | 13 +- 2 files changed, 83 insertions(+), 131 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index b0e84cb54cb94..feb86a33c5f88 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -20,41 +20,31 @@ package org.elasticsearch.cluster.routing; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import joptsimple.OptionParser; -import joptsimple.OptionSet; -import org.apache.lucene.store.Directory; import org.apache.lucene.store.SimpleFSDirectory; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.cli.MockTerminal; -import org.elasticsearch.cli.Terminal; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationDecision; import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; -import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommand; import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommandIT; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; @@ -91,94 +81,94 @@ protected Collection> nodePlugins() { } public void testAllocationIdIsSetAfterRecoveryOnAllocateStalePrimary() throws Exception { - /** + /* * test case to spot the problem if allocation id is adjusted before recovery is done (historyUUID is adjusted): * - * - start node1 (primary), index some docs, start node2 (replica) all docs are replicated; same historyUUID - * - stop node2 (replica) - * - corrupt index on node1, drop corrupted parts it with a corruption tool but don't change historyUUID - * (if we start both node1 and node2 no any recovery would be done due to same historyUUID while node1 has less docs than node2) - * after corruption tool it is required AllocateStalePrimary to be called. + * - Master node + 2 data nodes + * - (1) index some docs + * - stop primary (node1) + * - (2) index more docs to a formal replica (node2) + * - stop node2 * - * we'd like to disrupt the recovery (local) process: - * - put a fake corruption marker on node1 to disrupt the recovery - * - start node - allocation id is adjusted (*) - recovery failed and historyUUID is not changed - * - remove a fake corruption marker from node1 and start it again - * - the interruption on recovery has the effect close to AllocateStalePrimary is ignored if a real allocation - * id in (*) is persisted before recovery is done (and historyUUID is changed) - * - index is RED as shard allocation id does not match persisted at (*) allocation id (_forced_allocation) - * - AllocateStalePrimary has to be called again - * it fails if a shard allocation id at (*) is persisted - * - index turns to YELLOW - * - start replica -> after (a full) recovery index turns GREEN and all shards have the same number of docs ( numDocs - corrupted ) - * no recovery take place if a shard allocation id at (*) is persisted => - * => nodes are fully in-sync but have diff number of docs + * node1 would not be a new primary due to master node state - it is required to run AllocateStalePrimary + * - put a corruption marker to node1 to interrupt recovery + * - start node1 (shard would not start as it is stale) + * - allocate stale primary - allocation id is adjusted (*) - but it fails due to the presence of corruption marker + * - stop node1 + * - stop node0 (master node) to forget about recoverySource (it is stored in a routing table) + * - drop a corruption marker + * - start node0 (master) and node1 + * -> node0 becomes a new primary with the same historyUUID if (*) has a real allocation id + * -> node0 has a RED index if (*) points to a fake shard -> node requires another AllocateStalePrimary + * - index same amount of docs to node1 as it was added at (2) + * + * - node1 and node2 have the same number of docs ( but with different docs ) + * - bring node2 back + * -> no recovery take place if a shard allocation id at (*) is persisted => nodes are fully in-sync but have diff docs + * -> due to fake allocation id at (*) AllocateStalePrimary is forced for the 2nd time and a full recovery takes place */ - // initial set up final String indexName = "index42"; - final String node1 = internalCluster().startNode(); + String node0 = internalCluster().startMasterOnlyNode(); + String node1 = internalCluster().startNode(); + createIndex(indexName); final int numDocs = indexDocs(indexName); - final IndexSettings indexSettings = getIndexSettings(indexName); + final IndexSettings indexSettings = getIndexSettings(indexName, node1); final String primaryNodeId = getNodeIdByName(node1); final Set allocationIds = getAllocationIds(indexName); final ShardId shardId = new ShardId(resolveIndex(indexName), 0); final Path indexPath = getIndexPath(node1, shardId); assertThat(allocationIds, hasSize(1)); final Set historyUUIDs = historyUUIDs(node1, indexName); - final String node2 = internalCluster().startNode(); + String node2 = internalCluster().startNode(); ensureGreen(indexName); assertSameDocIdsOnShards(); // initial set up is done - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1)); - final int expectedNumDocs = corruptIndexAndGetExpectedNumDocs(node1, indexName, numDocs, shardId, indexPath); + // index more docs to node2 (formal replica) + int numExtraDocs = randomIntBetween(10, 100); + { + IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar2"); + } - // start nodes in the same order to pick up same data folders; same as node1 - final String node3 = internalCluster().startNode(); + indexRandom(true, false, false, Arrays.asList(builders)); + flush(indexName); + assertHitCount(client(node2).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs); + } - // there is only _stale_ primary (due to new allocation id) - checkNoValidShardCopy(indexName, shardId); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2)); // create fake corrupted marker putFakeCorruptionMarker(indexSettings, shardId, indexPath); + // thanks to master node1 is out of sync + node1 = internalCluster().startNode(); + + // there is only _stale_ primary + checkNoValidShardCopy(indexName, shardId); + // allocate stale primary - client().admin().cluster().prepareReroute() + client(node1).admin().cluster().prepareReroute() .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, primaryNodeId, true)) .get(); - // no valid shard copy as shard failed due to corruption marker - checkNoValidShardCopy(indexName, shardId); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node0)); - // restart node1 and remove fake corruption marker to be able to start shard - internalCluster().restartNode(node3, new InternalTestCluster.RestartCallback() { - @Override - public Settings onNodeStopped(String nodeName) throws Exception { - try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) { - store.removeCorruptionMarker(); - } - return super.onNodeStopped(nodeName); - } - }); + try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) { + store.removeCorruptionMarker(); + } - // check that allocation id is changed - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final MetaData indexMetaData = clusterState.metaData(); - final IndexMetaData index = indexMetaData.index(indexName); - assertThat(index, notNullValue()); - final Set newAllocationIds = index.inSyncAllocationIds(0); - assertThat(newAllocationIds, hasSize(1)); - assertThat(newAllocationIds, everyItem(not(isIn(allocationIds)))); - }); + node0 = internalCluster().startMasterOnlyNode(); + node1 = internalCluster().startNode(); - // that we can have in case of put a real (not a fake) allocation id: - // + // that we can have w/o fake id: // ensureYellow(indexName); - // Set newHistoryUUIds = historyUUIDs(indexName); - // assertThat(newHistoryUUIds, equalTo(historyUUIDs)); + // assertThat(historyUUIDs(node12, indexName), equalTo(historyUUIDs)); // index has to red: no any shard is allocated (allocation id is a fake id that does not match to anything) final ClusterHealthStatus indexHealthStatus = client().admin().cluster() @@ -192,24 +182,30 @@ public Settings onNodeStopped(String nodeName) throws Exception { ensureYellow(indexName); - // node4 uses same data folder as node2 - final String node4 = internalCluster().startNode(); + { + IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar3"); + } + + indexRandom(true, false, false, Arrays.asList(builders)); + flush(indexName); + } + assertHitCount(client(node1).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs); - // wait for the replica is fully recovered + // bring node2 back + node2 = internalCluster().startNode(); ensureGreen(indexName); - // check that historyUUID is changed - assertThat(historyUUIDs(node3, indexName), everyItem(not(isIn(historyUUIDs)))); - assertThat(historyUUIDs(node4, indexName), everyItem(not(isIn(historyUUIDs)))); + assertThat(historyUUIDs(node1, indexName), everyItem(not(isIn(historyUUIDs)))); + assertThat(historyUUIDs(node1, indexName), equalTo(historyUUIDs(node2, indexName))); - // both primary and replica have the same number of docs - assertHitCount(client(node3).prepareSearch(indexName).setQuery(matchAllQuery()).get(), expectedNumDocs); - // otherwise - replica has not been recovered and it has more docs than primary - assertHitCount(client(node4).prepareSearch(indexName).setQuery(matchAllQuery()).get(), expectedNumDocs); + assertHitCount(client(node1).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs); + assertHitCount(client(node2).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs); assertSameDocIdsOnShards(); } - private int indexDocs(String indexName) throws InterruptedException, ExecutionException { + private void createIndex(String indexName) { assertAcked(prepareCreate(indexName) .setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) @@ -218,6 +214,9 @@ private int indexDocs(String indexName) throws InterruptedException, ExecutionEx .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1") .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum"))); + } + + private int indexDocs(String indexName) throws InterruptedException, ExecutionException { // index some docs in several segments int numDocs = 0; for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) { @@ -229,7 +228,7 @@ private int indexDocs(String indexName) throws InterruptedException, ExecutionEx numDocs += numExtraDocs; - indexRandom(false, false, false, Arrays.asList(builders)); + indexRandom(true, false, false, Arrays.asList(builders)); flush(indexName); } @@ -251,48 +250,6 @@ private String getNodeIdByName(String nodeName) { return primaryNodeId; } - private int corruptIndexAndGetExpectedNumDocs(String nodeName, String indexName, int numDocs, - ShardId shardId, Path indexPath) throws Exception { - // corrupt index on node restart - internalCluster().restartNode(nodeName, new InternalTestCluster.RestartCallback() { - @Override - public Settings onNodeStopped(String node) throws Exception { - CorruptionUtils.corruptIndex(random(), indexPath, false); - return super.onNodeStopped(node); - } - }); - - // all shards should be failed due to a corrupted index - assertBusy(() -> { - final ClusterAllocationExplanation explanation = - client().admin().cluster().prepareAllocationExplain() - .setIndex(indexName).setShard(shardId.id()).setPrimary(true) - .get().getExplanation(); - - final UnassignedInfo unassignedInfo = explanation.getUnassignedInfo(); - assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); - }); - - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeName)); - - // drop corrupted documents from index, but keep the same historyUUID - final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand() { - @Override - protected void addNewHistoryCommit(Directory indexDirectory, Terminal terminal11, boolean updateLocalCheckpoint) { - // do not create a new history commit - } - }; - - final OptionParser parser = command.getParser(); - final Environment environment = TestEnvironment.newEnvironment(internalCluster().getDefaultSettings()); - final OptionSet options = parser.parse("-index", indexName, "-shard-id", Integer.toString(shardId.id())); - final MockTerminal terminal = new MockTerminal(); - terminal.addTextInput("y"); - command.execute(terminal, options, environment); - // grab number of corrupted docs from RemoveCorruptedShardDataCommand output - return RemoveCorruptedShardDataCommandIT.getExpectedNumDocs(numDocs, terminal); - } - private Path getIndexPath(String nodeName, ShardId shardId) { final Set indexDirs = RemoveCorruptedShardDataCommandIT.getDirs(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME); assertThat(indexDirs, hasSize(1)); @@ -305,8 +262,8 @@ private Set getAllocationIds(String indexName) { return allocationIds; } - private IndexSettings getIndexSettings(String indexName) { - final IndicesService indicesService = internalCluster().getInstance(IndicesService.class); + private IndexSettings getIndexSettings(String indexName, String nodeName) { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); final IndexService indexService = indicesService.indexService(resolveIndex(indexName)); return indexService.getIndexSettings(); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java index 30a1831e3a579..69b63a7cf64ce 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java @@ -74,7 +74,6 @@ import org.elasticsearch.test.transport.MockTransportService; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; @@ -242,20 +241,16 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertThat(explanation.getShardState(), equalTo(ShardRoutingState.STARTED)); }); - final int expectedNumDocs = getExpectedNumDocs(numDocs, terminal); + final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?\\d+) documents will be lost."); + final Matcher matcher = pattern.matcher(terminal.getOutput()); + assertThat(matcher.find(), equalTo(true)); + final int expectedNumDocs = numDocs - Integer.parseInt(matcher.group("docs")); ensureGreen(indexName); assertHitCount(client().prepareSearch(indexName).setQuery(matchAllQuery()).get(), expectedNumDocs); } - public static int getExpectedNumDocs(int numDocs, MockTerminal terminal) throws UnsupportedEncodingException { - final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?\\d+) documents will be lost."); - final Matcher matcher = pattern.matcher(terminal.getOutput()); - assertThat(matcher.find(), equalTo(true)); - return numDocs - Integer.parseInt(matcher.group("docs")); - } - public void testCorruptTranslogTruncation() throws Exception { internalCluster().startNodes(2, Settings.EMPTY); From 8b697ef3774f3296a8ca7c7636719370448b17b6 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Tue, 2 Oct 2018 13:23:01 +0200 Subject: [PATCH 04/25] extended testAllocateCommand: added AllocateStalePrimaryAllocationCommand --- .../allocation/AllocationCommandsTests.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java index da0920e69373b..3ce59dbe99783 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java @@ -28,8 +28,11 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; @@ -58,6 +61,8 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.List; +import java.util.Set; import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; @@ -66,6 +71,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -245,6 +251,29 @@ public void testAllocateCommand() { fail("expected IllegalArgumentException when allocating shard while no unassigned shard available"); } catch (IllegalArgumentException e) { } + + // mark all shards as stale + final List shardRoutings = clusterState.getRoutingNodes().shardsWithState(STARTED); + assertThat(shardRoutings, hasSize(2)); + clusterState = allocation.applyFailedShard(clusterState, shardRoutings.get(0), true); + clusterState = allocation.applyFailedShard(clusterState, shardRoutings.get(1), true); + + logger.info("--> allocating empty primary with acceptDataLoss flag set to true"); + clusterState = allocation.reroute(clusterState, + new AllocationCommands(new AllocateStalePrimaryAllocationCommand("test", 0, "node1", true)), false, false).getClusterState(); + RoutingNode routingNode1 = clusterState.getRoutingNodes().node("node1"); + assertThat(routingNode1.size(), equalTo(1)); + assertThat(routingNode1.shardsWithState(INITIALIZING).size(), equalTo(1)); + Set inSyncAllocationIds = clusterState.metaData().index("test").inSyncAllocationIds(0); + assertThat(inSyncAllocationIds, equalTo(Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID))); + + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + routingNode1 = clusterState.getRoutingNodes().node("node1"); + assertThat(routingNode1.size(), equalTo(1)); + assertThat(routingNode1.shardsWithState(STARTED).size(), equalTo(1)); + inSyncAllocationIds = clusterState.metaData().index("test").inSyncAllocationIds(0); + assertThat(inSyncAllocationIds, hasSize(1)); + assertThat(inSyncAllocationIds, not(Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID))); } public void testCancelCommand() { From e331b1f6e438b885c8749a71194d5e2c77f54b19 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Tue, 2 Oct 2018 13:24:47 +0200 Subject: [PATCH 05/25] enforced check that there is only one allocation id on adding/removing fake allocation id --- .../org/elasticsearch/cluster/routing/IndexRoutingTable.java | 3 ++- .../cluster/routing/allocation/IndexMetaDataUpdater.java | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index c3eb217003e60..3aca5fa0798d9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -142,7 +142,8 @@ boolean validate(MetaData metaData) { if (shardRouting.primary() && shardRouting.initializing() && shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE && inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false && - inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false) + (inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false + || inSyncAllocationIds.size() != 1)) throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " + "a known allocation id but has no corresponding entry in the in-sync " + "allocation set " + inSyncAllocationIds); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index 8de8e4460bca8..18ed0801abadc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -177,6 +177,11 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab inSyncAllocationIds.addAll(updates.addedAllocationIds); inSyncAllocationIds.removeAll(updates.removedAllocationIds); + assert oldInSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false + || inSyncAllocationIds.size() == 1 + && inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false : + "fake allocation id has to be removed, inSyncAllocationIds:" + inSyncAllocationIds; + // Prevent set of inSyncAllocationIds to grow unboundedly. This can happen for example if we don't write to a primary // but repeatedly shut down nodes that have active replicas. // We use number_of_replicas + 1 (= possible active shard copies) to bound the inSyncAllocationIds set From d3df30ccefeb60880553071cf882704f972fd301 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Wed, 3 Oct 2018 11:17:35 +0200 Subject: [PATCH 06/25] fix BalanceConfigurationTests: using TestGatewayAllocator instead of NoopGatewayAllocator that generated inconsistent cluster state --- .../allocation/BalanceConfigurationTests.java | 29 ++----------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java index 5e400d95e4b02..7dfca57ee90a9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -41,12 +41,9 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.test.gateway.TestGatewayAllocator; import org.hamcrest.Matchers; -import java.util.List; - import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; @@ -71,7 +68,7 @@ public void testIndexBalance() { settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold); - AllocationService strategy = createAllocationService(settings.build(), new NoopGatewayAllocator()); + AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); ClusterState clusterState = initCluster(strategy); assertIndexBalance(clusterState.getRoutingTable(), clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); @@ -95,7 +92,7 @@ public void testReplicaBalance() { settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceTreshold); - AllocationService strategy = createAllocationService(settings.build(), new NoopGatewayAllocator()); + AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); ClusterState clusterState = initCluster(strategy); assertReplicaBalance(logger, clusterState.getRoutingNodes(), numberOfNodes, numberOfIndices, numberOfReplicas, numberOfShards, balanceTreshold); @@ -352,7 +349,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing assertThat(shardRouting.state(), Matchers.equalTo(ShardRoutingState.INITIALIZING)); } } - strategy = createAllocationService(settings.build(), new NoopGatewayAllocator()); + strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); logger.info("use the new allocator and check if it moves shards"); routingNodes = clusterState.getRoutingNodes(); @@ -388,24 +385,4 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing } } - private class NoopGatewayAllocator extends GatewayAllocator { - - NoopGatewayAllocator() { - super(Settings.EMPTY); - } - - @Override - public void applyStartedShards(RoutingAllocation allocation, List startedShards) { - // noop - } - - @Override - public void applyFailedShards(RoutingAllocation allocation, List failedShards) { - // noop - } - @Override - public void allocateUnassigned(RoutingAllocation allocation) { - // noop - } - } } From fbdf6d77b6e5fe2d66666a02f20818a99ce7f105 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Wed, 3 Oct 2018 11:18:24 +0200 Subject: [PATCH 07/25] enforce check for fake_allocation: for existing store it could be only FORCE_STALE_PRIMARY_INSTANCE --- .../cluster/routing/allocation/IndexMetaDataUpdater.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index 18ed0801abadc..c7d05b0f0dc27 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -71,7 +71,7 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha addAllocationId(startedShard); if (startedShard.primary() // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state - && (initializingShard.recoverySource() instanceof RecoverySource.ExistingStoreRecoverySource + && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE || initializingShard.recoverySource() instanceof RecoverySource.SnapshotRecoverySource)) { Updates updates = changes(startedShard.shardId()); updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID); @@ -147,6 +147,7 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab Set oldInSyncAllocationIds = oldIndexMetaData.inSyncAllocationIds(shardId.id()); // check if we have been force-initializing an empty primary or a stale primary + // the same is applicable for snapshot/restore as it can be considered as allocating a stale primary. if (updates.initializedPrimary != null && oldInSyncAllocationIds.isEmpty() == false && oldInSyncAllocationIds.contains(updates.initializedPrimary.allocationId().getId()) == false) { // we're not reusing an existing in-sync allocation id to initialize a primary, which means that we're either force-allocating @@ -164,7 +165,7 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab // forcing an empty primary resets the in-sync allocations to the empty set (ShardRouting.allocatedPostIndexCreate) indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.emptySet()); } else { - assert recoverySource instanceof RecoverySource.ExistingStoreRecoverySource + assert recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE || recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource; // forcing a stale primary resets the in-sync allocations to the singleton set with the stale id From 8f909af05e06911e432cdc3130e4de50f87ceb06 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Thu, 25 Oct 2018 17:06:57 +0200 Subject: [PATCH 08/25] inline addAllocationId; add assert on initializingShard.allocationId() equal to startedShard.allocationId() --- .../routing/allocation/IndexMetaDataUpdater.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index c7d05b0f0dc27..25126421f795c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -39,6 +39,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -68,7 +69,9 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali @Override public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { - addAllocationId(startedShard); + assert Objects.equals(initializingShard.allocationId(), startedShard.allocationId()) + : "initializingShard and startedShard have to have the same allocation id"; + changes(startedShard.shardId()).addedAllocationIds.add(startedShard.allocationId().getId()); if (startedShard.primary() // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE @@ -304,14 +307,6 @@ void removeAllocationId(ShardRouting shardRouting) { } } - /** - * Add allocation id of this shard to the set of in-sync shard copies - */ - private void addAllocationId(ShardRouting shardRouting) { - final Updates changes = changes(shardRouting.shardId()); - changes.addedAllocationIds.add(shardRouting.allocationId().getId()); - } - /** * Increase primary term for this shard id */ From 3554615d9dd3c18ec59bd3b80e346f5563083946 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Thu, 25 Oct 2018 17:07:47 +0200 Subject: [PATCH 09/25] java doc for FORCED_ALLOCATION_ID; use front and back underscore for the fake allocation id --- .../org/elasticsearch/cluster/routing/RecoverySource.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index 09be960dff20b..c72c25a0578c3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -132,7 +132,11 @@ public String toString() { * Recovery from an existing on-disk store */ public static final class ExistingStoreRecoverySource extends RecoverySource { - public static final String FORCED_ALLOCATION_ID = "_forced_allocation"; + /** + * Special allocation id that shard has during initialization on allocate_stale_primary + */ + public static final String FORCED_ALLOCATION_ID = "_forced_allocation_"; + public static final ExistingStoreRecoverySource INSTANCE = new ExistingStoreRecoverySource(false); public static final ExistingStoreRecoverySource FORCE_STALE_PRIMARY_INSTANCE = new ExistingStoreRecoverySource(true); From 70a837939a6b0f1bfa8653c535465504e474bbaf Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Fri, 26 Oct 2018 08:00:13 +0200 Subject: [PATCH 10/25] S/R deserves its own allocation id --- .../allocation/IndexMetaDataUpdater.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index 25126421f795c..65fc998cc2467 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -150,7 +150,6 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab Set oldInSyncAllocationIds = oldIndexMetaData.inSyncAllocationIds(shardId.id()); // check if we have been force-initializing an empty primary or a stale primary - // the same is applicable for snapshot/restore as it can be considered as allocating a stale primary. if (updates.initializedPrimary != null && oldInSyncAllocationIds.isEmpty() == false && oldInSyncAllocationIds.contains(updates.initializedPrimary.allocationId().getId()) == false) { // we're not reusing an existing in-sync allocation id to initialize a primary, which means that we're either force-allocating @@ -168,12 +167,15 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab // forcing an empty primary resets the in-sync allocations to the empty set (ShardRouting.allocatedPostIndexCreate) indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.emptySet()); } else { - assert recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE - || recoverySource instanceof RecoverySource.SnapshotRecoverySource - : recoverySource; + final String allocationId; + if (recoverySource == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE) { + allocationId = RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID; + } else { + assert recoverySource instanceof RecoverySource.SnapshotRecoverySource : recoverySource; + allocationId = updates.initializedPrimary.allocationId().getId(); + } // forcing a stale primary resets the in-sync allocations to the singleton set with the stale id - indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), - Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)); + indexMetaDataBuilder.putInSyncAllocationIds(shardId.id(), Collections.singleton(allocationId)); } } else { // standard path for updating in-sync ids @@ -182,8 +184,7 @@ private IndexMetaData.Builder updateInSyncAllocations(RoutingTable newRoutingTab inSyncAllocationIds.removeAll(updates.removedAllocationIds); assert oldInSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false - || inSyncAllocationIds.size() == 1 - && inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false : + || inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false : "fake allocation id has to be removed, inSyncAllocationIds:" + inSyncAllocationIds; // Prevent set of inSyncAllocationIds to grow unboundedly. This can happen for example if we don't write to a primary From 9ac69bdb4adf3ef1cc9767dcc1d93d3754f88fd0 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 29 Oct 2018 09:49:58 +0100 Subject: [PATCH 11/25] fix assert on initializingShard.allocationId() equal to startedShard.allocationId() --- .../cluster/routing/allocation/IndexMetaDataUpdater.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index 65fc998cc2467..f1275759089ea 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -69,8 +69,9 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali @Override public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { - assert Objects.equals(initializingShard.allocationId(), startedShard.allocationId()) - : "initializingShard and startedShard have to have the same allocation id"; + assert Objects.equals(initializingShard.allocationId().getId(), startedShard.allocationId().getId()) + : "initializingShard.allocationId [" + initializingShard.allocationId().getId() + + "] and startedShard.allocationId [" + startedShard.allocationId().getId() + "] have to have the same"; changes(startedShard.shardId()).addedAllocationIds.add(startedShard.allocationId().getId()); if (startedShard.primary() // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state From 46183b1f64bd83ac3a47099c907478ee808cc88e Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 29 Oct 2018 11:17:21 +0100 Subject: [PATCH 12/25] fixed index routing table validation message for fake allocation id case --- .../cluster/routing/IndexRoutingTable.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 3aca5fa0798d9..6127b6ea73bc0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -140,14 +140,19 @@ boolean validate(MetaData metaData) { } if (shardRouting.primary() && shardRouting.initializing() && - shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE && - inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false && - (inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID) == false - || inSyncAllocationIds.size() != 1)) - throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " + - "a known allocation id but has no corresponding entry in the in-sync " + - "allocation set " + inSyncAllocationIds); - + shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE) { + if (inSyncAllocationIds.contains(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID)) { + if (inSyncAllocationIds.size() != 1) { + throw new IllegalStateException("a primary shard routing " + shardRouting + + " is a primary that is recovering from a stale primary has unexpected allocation ids in in-sync " + + "allocation set " + inSyncAllocationIds); + } + } else if (inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false) { + throw new IllegalStateException("a primary shard routing " + shardRouting + + " is a primary that is recovering from a known allocation id but has no corresponding entry in the in-sync " + + "allocation set " + inSyncAllocationIds); + } + } } } return true; From e14d0948381606810f6e940e4d1a45638025c26e Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 29 Oct 2018 12:32:24 +0100 Subject: [PATCH 13/25] extract AllocateStalePrimaryCommand to its own test method --- .../allocation/AllocationCommandsTests.java | 45 +++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java index 3ce59dbe99783..fadda5520b12b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -69,6 +70,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -251,27 +253,54 @@ public void testAllocateCommand() { fail("expected IllegalArgumentException when allocating shard while no unassigned shard available"); } catch (IllegalArgumentException e) { } + } + + public void testAllocateStalePrimaryCommand() { + AllocationService allocation = createAllocationService(Settings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none") + .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") + .build()); + final String index = "test"; + + logger.info("--> building initial routing table"); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(index).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1) + .putInSyncAllocationIds(0, Collections.singleton("asdf")).putInSyncAllocationIds(1, Collections.singleton("qwertz"))) + .build(); + // shard routing is added as "from recovery" instead of "new index creation" so that we can test below that allocating an empty + // primary with accept_data_loss flag set to false fails + RoutingTable routingTable = RoutingTable.builder() + .addAsRecovery(metaData.index(index)) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData).routingTable(routingTable).build(); + + final String node1 = "node1"; + final String node2 = "node2"; + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .add(newNode(node1)) + .add(newNode(node2)) + ).build(); + clusterState = allocation.reroute(clusterState, "reroute"); // mark all shards as stale - final List shardRoutings = clusterState.getRoutingNodes().shardsWithState(STARTED); + final List shardRoutings = clusterState.getRoutingNodes().shardsWithState(UNASSIGNED); assertThat(shardRoutings, hasSize(2)); - clusterState = allocation.applyFailedShard(clusterState, shardRoutings.get(0), true); - clusterState = allocation.applyFailedShard(clusterState, shardRoutings.get(1), true); logger.info("--> allocating empty primary with acceptDataLoss flag set to true"); clusterState = allocation.reroute(clusterState, - new AllocationCommands(new AllocateStalePrimaryAllocationCommand("test", 0, "node1", true)), false, false).getClusterState(); - RoutingNode routingNode1 = clusterState.getRoutingNodes().node("node1"); + new AllocationCommands(new AllocateStalePrimaryAllocationCommand(index, 0, node1, true)), false, false).getClusterState(); + RoutingNode routingNode1 = clusterState.getRoutingNodes().node(node1); assertThat(routingNode1.size(), equalTo(1)); assertThat(routingNode1.shardsWithState(INITIALIZING).size(), equalTo(1)); - Set inSyncAllocationIds = clusterState.metaData().index("test").inSyncAllocationIds(0); + Set inSyncAllocationIds = clusterState.metaData().index(index).inSyncAllocationIds(0); assertThat(inSyncAllocationIds, equalTo(Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID))); clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); - routingNode1 = clusterState.getRoutingNodes().node("node1"); + routingNode1 = clusterState.getRoutingNodes().node(node1); assertThat(routingNode1.size(), equalTo(1)); assertThat(routingNode1.shardsWithState(STARTED).size(), equalTo(1)); - inSyncAllocationIds = clusterState.metaData().index("test").inSyncAllocationIds(0); + inSyncAllocationIds = clusterState.metaData().index(index).inSyncAllocationIds(0); assertThat(inSyncAllocationIds, hasSize(1)); assertThat(inSyncAllocationIds, not(Collections.singleton(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID))); } From 72a24a40bd2e0b5207e2c2cb840d8f58e42022e5 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 29 Oct 2018 21:35:57 +0100 Subject: [PATCH 14/25] simplify AllocationIdIT --- .../cluster/routing/AllocationIdIT.java | 108 ++++++++---------- 1 file changed, 50 insertions(+), 58 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index feb86a33c5f88..83a5c16f2a9cd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; @@ -80,14 +81,14 @@ protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class); } - public void testAllocationIdIsSetAfterRecoveryOnAllocateStalePrimary() throws Exception { - /* - * test case to spot the problem if allocation id is adjusted before recovery is done (historyUUID is adjusted): + public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStalePrimary() throws Exception { + /* test case to spot the problem if allocation id is adjusted before recovery is done (historyUUID is adjusted): + * ( if recovery is failed on AllocateStalePrimary it requires another AllocateStalePrimary ) * * - Master node + 2 data nodes * - (1) index some docs * - stop primary (node1) - * - (2) index more docs to a formal replica (node2) + * - (2) index more docs to a node2, that marks node1 as stale * - stop node2 * * node1 would not be a new primary due to master node state - it is required to run AllocateStalePrimary @@ -95,24 +96,22 @@ public void testAllocationIdIsSetAfterRecoveryOnAllocateStalePrimary() throws Ex * - start node1 (shard would not start as it is stale) * - allocate stale primary - allocation id is adjusted (*) - but it fails due to the presence of corruption marker * - stop node1 - * - stop node0 (master node) to forget about recoverySource (it is stored in a routing table) - * - drop a corruption marker - * - start node0 (master) and node1 - * -> node0 becomes a new primary with the same historyUUID if (*) has a real allocation id - * -> node0 has a RED index if (*) points to a fake shard -> node requires another AllocateStalePrimary - * - index same amount of docs to node1 as it was added at (2) - * - * - node1 and node2 have the same number of docs ( but with different docs ) + * - stop master to forget about recoverySource (it is stored in a routing table) + * - remove a corruption marker + * - try to open index on node1 + * -> node1 has a RED index if (*) points to a fake shard -> node requires another AllocateStalePrimary + * - check that restart does not help + * - another manual intervention of AllocateStalePrimary brings index to yellow state * - bring node2 back - * -> no recovery take place if a shard allocation id at (*) is persisted => nodes are fully in-sync but have diff docs - * -> due to fake allocation id at (*) AllocateStalePrimary is forced for the 2nd time and a full recovery takes place + * -> nodes are fully in-sync and has different to initial history uuid */ + // initial set up final String indexName = "index42"; - String node0 = internalCluster().startMasterOnlyNode(); + String master = internalCluster().startMasterOnlyNode(); String node1 = internalCluster().startNode(); createIndex(indexName); - final int numDocs = indexDocs(indexName); + final int numDocs = indexDocs(indexName, "foo", "bar"); final IndexSettings indexSettings = getIndexSettings(indexName, node1); final String primaryNodeId = getNodeIdByName(node1); final Set allocationIds = getAllocationIds(indexName); @@ -127,22 +126,13 @@ public void testAllocationIdIsSetAfterRecoveryOnAllocateStalePrimary() throws Ex internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1)); - // index more docs to node2 (formal replica) - int numExtraDocs = randomIntBetween(10, 100); - { - IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs]; - for (int i = 0; i < builders.length; i++) { - builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar2"); - } - - indexRandom(true, false, false, Arrays.asList(builders)); - flush(indexName); - assertHitCount(client(node2).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs); - } + // index more docs to node2 that marks node1 as stale + int numExtraDocs = indexDocs(indexName, "foo", "bar2"); + assertHitCount(client(node2).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2)); - // create fake corrupted marker + // create fake corrupted marker on node1 putFakeCorruptionMarker(indexSettings, shardId, indexPath); // thanks to master node1 is out of sync @@ -156,24 +146,35 @@ public void testAllocationIdIsSetAfterRecoveryOnAllocateStalePrimary() throws Ex .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, primaryNodeId, true)) .get(); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1)); - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node0)); + // allocation fails due to corruption marker + assertBusy(() -> { + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + final ShardRouting shardRouting = state.routingTable().index(indexName).shard(shardId.id()).primaryShard(); + assertThat(shardRouting.state(), equalTo(ShardRoutingState.UNASSIGNED)); + assertThat(shardRouting.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); + }); + + // stop master to forget about recoverySource, it is stored in a routing table (in memory, not in a disk state) + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(master)); try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) { store.removeCorruptionMarker(); } - node0 = internalCluster().startMasterOnlyNode(); - node1 = internalCluster().startNode(); + master = internalCluster().startMasterOnlyNode(); - // that we can have w/o fake id: - // ensureYellow(indexName); - // assertThat(historyUUIDs(node12, indexName), equalTo(historyUUIDs)); + // open index - no any reasons to wait 30 sec as no any health shard for that + client(node1).admin().indices().prepareOpen(indexName).setTimeout(TimeValue.timeValueSeconds(5)).get(); - // index has to red: no any shard is allocated (allocation id is a fake id that does not match to anything) - final ClusterHealthStatus indexHealthStatus = client().admin().cluster() - .health(Requests.clusterHealthRequest(indexName)).actionGet().getStatus(); - assertThat(indexHealthStatus, is(ClusterHealthStatus.RED)); + // index is red: no any shard is allocated (allocation id is a fake id that does not match to anything) + checkHealthStatus(indexName, ClusterHealthStatus.RED); + checkNoValidShardCopy(indexName, shardId); + + internalCluster().restartNode(node1, InternalTestCluster.EMPTY_CALLBACK); + + // index is still red due to mismatch of allocation id + checkHealthStatus(indexName, ClusterHealthStatus.RED); + checkNoValidShardCopy(indexName, shardId); // no any valid shard is there; have to invoke AllocateStalePrimary again client().admin().cluster().prepareReroute() @@ -182,17 +183,6 @@ public void testAllocationIdIsSetAfterRecoveryOnAllocateStalePrimary() throws Ex ensureYellow(indexName); - { - IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs]; - for (int i = 0; i < builders.length; i++) { - builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar3"); - } - - indexRandom(true, false, false, Arrays.asList(builders)); - flush(indexName); - } - assertHitCount(client(node1).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs); - // bring node2 back node2 = internalCluster().startNode(); ensureGreen(indexName); @@ -200,11 +190,15 @@ public void testAllocationIdIsSetAfterRecoveryOnAllocateStalePrimary() throws Ex assertThat(historyUUIDs(node1, indexName), everyItem(not(isIn(historyUUIDs)))); assertThat(historyUUIDs(node1, indexName), equalTo(historyUUIDs(node2, indexName))); - assertHitCount(client(node1).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs); - assertHitCount(client(node2).prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocs + numExtraDocs); assertSameDocIdsOnShards(); } + public void checkHealthStatus(String indexName, ClusterHealthStatus healthStatus) { + final ClusterHealthStatus indexHealthStatus = client().admin().cluster() + .health(Requests.clusterHealthRequest(indexName)).actionGet().getStatus(); + assertThat(indexHealthStatus, is(healthStatus)); + } + private void createIndex(String indexName) { assertAcked(prepareCreate(indexName) .setSettings(Settings.builder() @@ -216,20 +210,18 @@ private void createIndex(String indexName) { .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum"))); } - private int indexDocs(String indexName) throws InterruptedException, ExecutionException { + private int indexDocs(String indexName, Object ... source) throws InterruptedException, ExecutionException { // index some docs in several segments int numDocs = 0; for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) { final int numExtraDocs = between(10, 100); IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs]; for (int i = 0; i < builders.length; i++) { - builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar"); + builders[i] = client().prepareIndex(indexName, "type").setSource(source); } + indexRandom(true, false, true, Arrays.asList(builders)); numDocs += numExtraDocs; - - indexRandom(true, false, false, Arrays.asList(builders)); - flush(indexName); } return numDocs; From e06ba85969f9c0a25ce10a88bb7f30addb15a441 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Tue, 30 Oct 2018 08:39:22 +0100 Subject: [PATCH 15/25] simplify AllocationIdIT; don't restart master --- .../elasticsearch/cluster/routing/AllocationIdIT.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index 83a5c16f2a9cd..f7ce0563dc9f6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -96,7 +96,6 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale * - start node1 (shard would not start as it is stale) * - allocate stale primary - allocation id is adjusted (*) - but it fails due to the presence of corruption marker * - stop node1 - * - stop master to forget about recoverySource (it is stored in a routing table) * - remove a corruption marker * - try to open index on node1 * -> node1 has a RED index if (*) points to a fake shard -> node requires another AllocateStalePrimary @@ -108,7 +107,7 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale // initial set up final String indexName = "index42"; - String master = internalCluster().startMasterOnlyNode(); + final String master = internalCluster().startMasterOnlyNode(); String node1 = internalCluster().startNode(); createIndex(indexName); final int numDocs = indexDocs(indexName, "foo", "bar"); @@ -154,16 +153,11 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale assertThat(shardRouting.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); }); - // stop master to forget about recoverySource, it is stored in a routing table (in memory, not in a disk state) - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(master)); - try(Store store = new Store(shardId, indexSettings, new SimpleFSDirectory(indexPath), new DummyShardLock(shardId))) { store.removeCorruptionMarker(); } - master = internalCluster().startMasterOnlyNode(); - - // open index - no any reasons to wait 30 sec as no any health shard for that + // open index; no any reasons to wait 30 sec as no any health shard for that client(node1).admin().indices().prepareOpen(indexName).setTimeout(TimeValue.timeValueSeconds(5)).get(); // index is red: no any shard is allocated (allocation id is a fake id that does not match to anything) From e741bf594a80e0e2ca3f11f706c07961fd4c0c44 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Tue, 30 Oct 2018 09:30:46 +0100 Subject: [PATCH 16/25] after merge compilation fix --- .../org/elasticsearch/cluster/routing/AllocationIdIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index f7ce0563dc9f6..fe24d7723a0d7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -120,7 +120,7 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale final Set historyUUIDs = historyUUIDs(node1, indexName); String node2 = internalCluster().startNode(); ensureGreen(indexName); - assertSameDocIdsOnShards(); + internalCluster().assertSameDocIdsOnShards(); // initial set up is done internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1)); @@ -184,7 +184,7 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale assertThat(historyUUIDs(node1, indexName), everyItem(not(isIn(historyUUIDs)))); assertThat(historyUUIDs(node1, indexName), equalTo(historyUUIDs(node2, indexName))); - assertSameDocIdsOnShards(); + internalCluster().assertSameDocIdsOnShards(); } public void checkHealthStatus(String indexName, ClusterHealthStatus healthStatus) { From 99dc6662fca4f9eca6513cd932d2e5353c3c7ca1 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 5 Nov 2018 11:51:10 +0100 Subject: [PATCH 17/25] move updates earlier and reuse it --- .../cluster/routing/allocation/IndexMetaDataUpdater.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index f1275759089ea..ebffe82a5a246 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -72,12 +72,12 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha assert Objects.equals(initializingShard.allocationId().getId(), startedShard.allocationId().getId()) : "initializingShard.allocationId [" + initializingShard.allocationId().getId() + "] and startedShard.allocationId [" + startedShard.allocationId().getId() + "] have to have the same"; - changes(startedShard.shardId()).addedAllocationIds.add(startedShard.allocationId().getId()); + Updates updates = changes(startedShard.shardId()); + updates.addedAllocationIds.add(startedShard.allocationId().getId()); if (startedShard.primary() // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE || initializingShard.recoverySource() instanceof RecoverySource.SnapshotRecoverySource)) { - Updates updates = changes(startedShard.shardId()); updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID); } } From e8e3925ab530e75f9acc4a919ac820681a41ab55 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 5 Nov 2018 11:56:59 +0100 Subject: [PATCH 18/25] drop getNodeIdByName as AllocateStalePrimaryAllocationCommand can use node name instead of node id --- .../cluster/routing/AllocationIdIT.java | 24 ++----------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index fe24d7723a0d7..6f5a226391905 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.routing; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.lucene.store.SimpleFSDirectory; import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation; import org.elasticsearch.action.admin.indices.stats.ShardStats; @@ -28,8 +27,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationDecision; import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; @@ -71,7 +68,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) public class AllocationIdIT extends ESIntegTestCase { @@ -112,7 +108,6 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale createIndex(indexName); final int numDocs = indexDocs(indexName, "foo", "bar"); final IndexSettings indexSettings = getIndexSettings(indexName, node1); - final String primaryNodeId = getNodeIdByName(node1); final Set allocationIds = getAllocationIds(indexName); final ShardId shardId = new ShardId(resolveIndex(indexName), 0); final Path indexPath = getIndexPath(node1, shardId); @@ -142,7 +137,7 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale // allocate stale primary client(node1).admin().cluster().prepareReroute() - .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, primaryNodeId, true)) + .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true)) .get(); // allocation fails due to corruption marker @@ -172,7 +167,7 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale // no any valid shard is there; have to invoke AllocateStalePrimary again client().admin().cluster().prepareReroute() - .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, primaryNodeId, true)) + .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, node1, true)) .get(); ensureYellow(indexName); @@ -221,21 +216,6 @@ private int indexDocs(String indexName, Object ... source) throws InterruptedExc return numDocs; } - private String getNodeIdByName(String nodeName) { - String primaryNodeId = null; - final ClusterState state = client().admin().cluster().prepareState().get().getState(); - final DiscoveryNodes nodes = state.nodes(); - for (ObjectObjectCursor cursor : nodes.getNodes()) { - final String name = cursor.value.getName(); - if (name.equals(nodeName)) { - primaryNodeId = cursor.key; - break; - } - } - assertThat(primaryNodeId, notNullValue()); - return primaryNodeId; - } - private Path getIndexPath(String nodeName, ShardId shardId) { final Set indexDirs = RemoveCorruptedShardDataCommandIT.getDirs(nodeName, shardId, ShardPath.INDEX_FOLDER_NAME); assertThat(indexDirs, hasSize(1)); From 3f02dce39201d9b33d4d4f135dbc1c3ca8430f6b Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 5 Nov 2018 11:59:37 +0100 Subject: [PATCH 19/25] dropped unnecessary settings --- .../java/org/elasticsearch/cluster/routing/AllocationIdIT.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index 6f5a226391905..c1968bc66080f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -193,9 +193,6 @@ private void createIndex(String indexName) { .setSettings(Settings.builder() .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1") - .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum"))); } From dd2fe3bbf196037cd580749c1ed41d9da8f2ef09 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 5 Nov 2018 12:10:34 +0100 Subject: [PATCH 20/25] handle single historyUUID --- .../cluster/routing/AllocationIdIT.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index c1968bc66080f..dd938eb344bf9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommandIT; @@ -47,7 +46,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.engine.MockEngineSupport; import org.elasticsearch.test.transport.MockTransportService; import java.io.IOException; @@ -62,11 +60,9 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.not; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) @@ -112,7 +108,7 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale final ShardId shardId = new ShardId(resolveIndex(indexName), 0); final Path indexPath = getIndexPath(node1, shardId); assertThat(allocationIds, hasSize(1)); - final Set historyUUIDs = historyUUIDs(node1, indexName); + final String historyUUID = historyUUID(node1, indexName); String node2 = internalCluster().startNode(); ensureGreen(indexName); internalCluster().assertSameDocIdsOnShards(); @@ -176,8 +172,8 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale node2 = internalCluster().startNode(); ensureGreen(indexName); - assertThat(historyUUIDs(node1, indexName), everyItem(not(isIn(historyUUIDs)))); - assertThat(historyUUIDs(node1, indexName), equalTo(historyUUIDs(node2, indexName))); + assertThat(historyUUID(node1, indexName), not(equalTo(historyUUID))); + assertThat(historyUUID(node1, indexName), equalTo(historyUUID(node2, indexName))); internalCluster().assertSameDocIdsOnShards(); } @@ -231,11 +227,14 @@ private IndexSettings getIndexSettings(String indexName, String nodeName) { return indexService.getIndexSettings(); } - private Set historyUUIDs(String node, String indexName) { + private String historyUUID(String node, String indexName) { final ShardStats[] shards = client(node).admin().indices().prepareStats(indexName).clear().get().getShards(); assertThat(shards.length, greaterThan(0)); - return Arrays.stream(shards).map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY)) + final Set historyUUIDs = Arrays.stream(shards) + .map(shard -> shard.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY)) .collect(Collectors.toSet()); + assertThat(historyUUIDs, hasSize(1)); + return historyUUIDs.iterator().next(); } private void putFakeCorruptionMarker(IndexSettings indexSettings, ShardId shardId, Path indexPath) throws IOException { From 1d8899ebe64bcc77377c3f3ed5ebfd29ecec6cd4 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 5 Nov 2018 12:12:41 +0100 Subject: [PATCH 21/25] reuse ESIntegTestCase.createIndex --- .../cluster/routing/AllocationIdIT.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index dd938eb344bf9..ffbd7462aca2f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -57,7 +57,6 @@ import java.util.stream.Collectors; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -101,7 +100,10 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale final String indexName = "index42"; final String master = internalCluster().startMasterOnlyNode(); String node1 = internalCluster().startNode(); - createIndex(indexName); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum").build()); final int numDocs = indexDocs(indexName, "foo", "bar"); final IndexSettings indexSettings = getIndexSettings(indexName, node1); final Set allocationIds = getAllocationIds(indexName); @@ -184,14 +186,6 @@ public void checkHealthStatus(String indexName, ClusterHealthStatus healthStatus assertThat(indexHealthStatus, is(healthStatus)); } - private void createIndex(String indexName) { - assertAcked(prepareCreate(indexName) - .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum"))); - } - private int indexDocs(String indexName, Object ... source) throws InterruptedException, ExecutionException { // index some docs in several segments int numDocs = 0; From d141d9ed4d0fe4698d61484119ac19859c35e2ba Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 5 Nov 2018 12:17:22 +0100 Subject: [PATCH 22/25] drop redundant assertBusy --- .../cluster/routing/AllocationIdIT.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index ffbd7462aca2f..2675fe45bcc82 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -238,17 +238,15 @@ private void putFakeCorruptionMarker(IndexSettings indexSettings, ShardId shardI } private void checkNoValidShardCopy(String indexName, ShardId shardId) throws Exception { - assertBusy(() -> { - final ClusterAllocationExplanation explanation = - client().admin().cluster().prepareAllocationExplain() - .setIndex(indexName).setShard(shardId.id()).setPrimary(true) - .get().getExplanation(); - - final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); - assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); - assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), - equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); - }); + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(shardId.id()).setPrimary(true) + .get().getExplanation(); + + final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); + assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); + assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), + equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); } } From e4e45c770054fa7ec1f64cc6bda68d6e08f1dc74 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 5 Nov 2018 12:31:21 +0100 Subject: [PATCH 23/25] comment on the reason behind the test --- .../cluster/routing/AllocationIdIT.java | 28 ++++++------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index 2675fe45bcc82..178370d8005a7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -73,27 +73,17 @@ protected Collection> nodePlugins() { } public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStalePrimary() throws Exception { - /* test case to spot the problem if allocation id is adjusted before recovery is done (historyUUID is adjusted): - * ( if recovery is failed on AllocateStalePrimary it requires another AllocateStalePrimary ) + /* + * Allocation id is put on start of shard while historyUUID is adjusted after recovery is done. * - * - Master node + 2 data nodes - * - (1) index some docs - * - stop primary (node1) - * - (2) index more docs to a node2, that marks node1 as stale - * - stop node2 + * If during execution of AllocateStalePrimary a proper allocation id is stored in allocation id set and recovery is failed + * shard restart skips the stage where historyUUID is changed. * - * node1 would not be a new primary due to master node state - it is required to run AllocateStalePrimary - * - put a corruption marker to node1 to interrupt recovery - * - start node1 (shard would not start as it is stale) - * - allocate stale primary - allocation id is adjusted (*) - but it fails due to the presence of corruption marker - * - stop node1 - * - remove a corruption marker - * - try to open index on node1 - * -> node1 has a RED index if (*) points to a fake shard -> node requires another AllocateStalePrimary - * - check that restart does not help - * - another manual intervention of AllocateStalePrimary brings index to yellow state - * - bring node2 back - * -> nodes are fully in-sync and has different to initial history uuid + * That leads to situation where allocated stale primary and its replica belongs to the same historyUUID and + * replica will receive operations after local checkpoint while documents before checkpoints could be significant different. + * + * Therefore, on AllocateStalePrimary we put some fake allocation id (no real one could be generated like that) + * and any failure during recovery requires extra AllocateStalePrimary command to be executed. */ // initial set up From 2b6d53559a05f5d1fefeacc42809d173abaa5cb5 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 5 Nov 2018 13:45:02 +0100 Subject: [PATCH 24/25] S&R leftover --- .../cluster/routing/allocation/IndexMetaDataUpdater.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java index ebffe82a5a246..54625a15e8d80 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/IndexMetaDataUpdater.java @@ -76,8 +76,7 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha updates.addedAllocationIds.add(startedShard.allocationId().getId()); if (startedShard.primary() // started shard has to have null recoverySource; have to pick up recoverySource from its initializing state - && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE - || initializingShard.recoverySource() instanceof RecoverySource.SnapshotRecoverySource)) { + && (initializingShard.recoverySource() == RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE)) { updates.removedAllocationIds.add(RecoverySource.ExistingStoreRecoverySource.FORCED_ALLOCATION_ID); } } From 07a795bdf9c7f851ea3325a7b45751418275e0c5 Mon Sep 17 00:00:00 2001 From: Vladimir Dolzhenko Date: Mon, 5 Nov 2018 14:20:03 +0100 Subject: [PATCH 25/25] drop useless open index (index is still opened) --- .../org/elasticsearch/cluster/routing/AllocationIdIT.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java index 178370d8005a7..f9da7a1aa8e4b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdIT.java @@ -31,7 +31,6 @@ import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MockEngineFactoryPlugin; @@ -140,9 +139,6 @@ public void testFailedRecoveryOnAllocateStalePrimaryRequiresAnotherAllocateStale store.removeCorruptionMarker(); } - // open index; no any reasons to wait 30 sec as no any health shard for that - client(node1).admin().indices().prepareOpen(indexName).setTimeout(TimeValue.timeValueSeconds(5)).get(); - // index is red: no any shard is allocated (allocation id is a fake id that does not match to anything) checkHealthStatus(indexName, ClusterHealthStatus.RED); checkNoValidShardCopy(indexName, shardId);