diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index ca19c142d956c..06cf97a3985e4 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -471,12 +471,14 @@ public void deleteIndexStore(String reason, IndexMetaData metaData) throws IOExc if (nodeEnv.hasNodeFile()) { synchronized (this) { String indexName = metaData.index(); - if (indices.containsKey(metaData.index())) { - String localUUid = indices.get(metaData.index()).v1().indexUUID(); - throw new ElasticsearchIllegalStateException("Can't delete index store for [" + metaData.getIndex() + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]"); + if (indices.containsKey(indexName)) { + String localUUid = indices.get(indexName).v1().indexUUID(); + throw new ElasticsearchIllegalStateException("Can't delete index store for [" + indexName + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]"); } ClusterState clusterState = clusterService.state(); - if (clusterState.metaData().hasIndex(indexName)) { + if (clusterState.metaData().hasIndex(indexName) && (clusterState.nodes().localNode().masterNode() == true)) { + // we do not delete the store if it is a master eligible node and the index is still in the cluster state + // because we want to keep the meta data for indices around even if no shards are left here final IndexMetaData index = clusterState.metaData().index(indexName); throw new ElasticsearchIllegalStateException("Can't delete closed index store for [" + indexName + "] - it's still part of the cluster state [" + index.getUUID() + "] [" + metaData.getUUID() + "]"); } diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 4cdebbfbb90db..7d43e9e90758c 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -296,9 +296,18 @@ public ClusterState execute(ClusterState currentState) throws Exception { IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex()); try { indicesService.deleteShardStore("no longer used", shardId, indexMeta); - } catch (Exception ex) { + } catch (Throwable ex) { logger.debug("{} failed to delete unallocated shard, ignoring", ex, shardId); } + // if the index doesn't exists anymore, delete its store as well, but only if its a non master node, since master + // nodes keep the index metadata around + if (indicesService.hasIndex(shardId.getIndex()) == false && currentState.nodes().localNode().masterNode() == false) { + try { + indicesService.deleteIndexStore("no longer used", indexMeta); + } catch (Throwable ex) { + logger.debug("{} failed to delete unallocated index, ignoring", ex, shardId.getIndex()); + } + } return currentState; } diff --git a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java index 21ce5bbb5a695..ad71c70d17f73 100644 --- a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java +++ b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java @@ -27,11 +27,13 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; @@ -41,6 +43,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; @@ -53,6 +57,57 @@ @ClusterScope(scope= Scope.TEST, numDataNodes = 0) public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest { + @Test + public void indexCleanup() throws Exception { + final String masterNode = internalCluster().startNode(ImmutableSettings.builder().put("node.data", false)); + final String node_1 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false)); + final String node_2 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false)); + logger.info("--> creating index [test] with one shard and on replica"); + assertAcked(prepareCreate("test").setSettings( + ImmutableSettings.builder().put(indexSettings()) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + ensureGreen("test"); + + logger.info("--> making sure that shard and its replica are allocated on node_1 and node_2"); + assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(true)); + assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true)); + + logger.info("--> starting node server3"); + final String node_3 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false)); + logger.info("--> running cluster_health"); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForNodes("4") + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + + assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(true)); + assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true)); + assertThat(Files.exists(shardDirectory(node_3, "test", 0)), equalTo(false)); + assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false)); + + logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish"); + internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get(); + clusterHealth = client().admin().cluster().prepareHealth() + .setWaitForNodes("4") + .setWaitForRelocatingShards(0) + .get(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + + assertThat(waitForShardDeletion(node_1, "test", 0), equalTo(false)); + assertThat(waitForIndexDeletion(node_1, "test"), equalTo(false)); + assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true)); + assertThat(Files.exists(shardDirectory(node_3, "test", 0)), equalTo(true)); + assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(true)); + } + @Test public void shardsCleanup() throws Exception { final String node_1 = internalCluster().startNode(); @@ -115,26 +170,43 @@ public void shardsCleanup() throws Exception { @Test public void testShardActiveElseWhere() throws Exception { - String node_1 = internalCluster().startNode(); - String node_2 = internalCluster().startNode(); + boolean node1IsMasterEligible = randomBoolean(); + boolean node2IsMasterEligible = !node1IsMasterEligible || randomBoolean(); + Future node_1_future = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.master", node1IsMasterEligible).build()); + Future node_2_future = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.master", node2IsMasterEligible).build()); + final String node_1 = node_1_future.get(); + final String node_2 = node_2_future.get(); final String node_1_id = internalCluster().getInstance(DiscoveryService.class, node_1).localNode().getId(); final String node_2_id = internalCluster().getInstance(DiscoveryService.class, node_2).localNode().getId(); + logger.debug("node {} (node_1) is {}master eligible", node_1, node1IsMasterEligible ? "" : "not "); + logger.debug("node {} (node_2) is {}master eligible", node_2, node2IsMasterEligible ? "" : "not "); + logger.debug("node {} became master", internalCluster().getMasterName()); final int numShards = scaledRandomIntBetween(2, 20); assertAcked(prepareCreate("test") .setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)) ); ensureGreen("test"); + waitNoPendingTasksOnAll(); ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get(); + RoutingNode routingNode = stateResponse.getState().routingNodes().node(node_2_id); - int[] node2Shards = new int[routingNode.numberOfOwningShards()]; + final int[] node2Shards = new int[routingNode.numberOfOwningShards()]; int i = 0; for (MutableShardRouting mutableShardRouting : routingNode) { - node2Shards[i++] = mutableShardRouting.shardId().id(); + node2Shards[i] = mutableShardRouting.shardId().id(); + i++; } logger.info("Node 2 has shards: {}", Arrays.toString(node2Shards)); - waitNoPendingTasksOnAll(); + final long shardVersions[] = new long[numShards]; + final int shardIds[] = new int[numShards]; + i=0; + for (ShardRouting shardRouting : stateResponse.getState().getRoutingTable().allShards("test")) { + shardVersions[i] = shardRouting.version(); + shardIds[i] = shardRouting.getId(); + i++; + } internalCluster().getInstance(ClusterService.class, node_2).submitStateUpdateTask("test", Priority.IMMEDIATE, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -142,7 +214,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { for (int i = 0; i < numShards; i++) { indexRoutingTableBuilder.addIndexShard( new IndexShardRoutingTable.Builder(new ShardId("test", i), false) - .addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, 1)) + .addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, shardVersions[shardIds[i]])) .build() ); } @@ -166,6 +238,11 @@ public void onFailure(String source, Throwable t) { } } + private Path indexDirectory(String server, String index) { + NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); + return env.indexPaths(new Index(index))[0]; + } + private Path shardDirectory(String server, String index, int shard) { NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); return env.shardPaths(new ShardId(index, shard))[0]; @@ -181,5 +258,13 @@ public boolean apply(Object o) { return Files.exists(shardDirectory(server, index, shard)); } - + private boolean waitForIndexDeletion(final String server, final String index) throws InterruptedException { + awaitBusy(new Predicate() { + @Override + public boolean apply(Object o) { + return !Files.exists(indexDirectory(server, index)); + } + }); + return Files.exists(indexDirectory(server, index)); + } }