diff --git a/src/main/java/org/elasticsearch/indices/IndicesService.java b/src/main/java/org/elasticsearch/indices/IndicesService.java index fb16a44ec022a..a063ccc5c260d 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -542,18 +542,38 @@ public void deleteShardStore(String reason, ShardLock lock, Settings indexSettin * This method deletes the shard contents on disk for the given shard ID. This method will fail if the shard deleting * is prevented by {@link #canDeleteShardContent(org.elasticsearch.index.shard.ShardId, org.elasticsearch.cluster.metadata.IndexMetaData)} * of if the shards lock can not be acquired. + * + * On data nodes, if the deleted shard is the last shard folder in it's index, the method will attempt to remove the index folder as well. + * * @param reason the reason for the shard deletion * @param shardId the shards ID to delete - * @param metaData the shards index metadata. This is required to access the indexes settings etc. + * @param clusterState . This is required to access the indexes settings etc. * @throws IOException if an IOException occurs */ - public void deleteShardStore(String reason, ShardId shardId, IndexMetaData metaData) throws IOException { + public void deleteShardStore(String reason, ShardId shardId, ClusterState clusterState) throws IOException { + final IndexMetaData metaData = clusterState.getMetaData().indices().get(shardId.getIndex()); + final Settings indexSettings = buildIndexSettings(metaData); if (canDeleteShardContent(shardId, indexSettings) == false) { throw new ElasticsearchIllegalStateException("Can't delete shard " + shardId); } nodeEnv.deleteShardDirectorySafe(shardId, indexSettings); - logger.trace("{} deleting shard reason [{}]", shardId, reason); + logger.debug("{} deleted shard reason [{}]", shardId, reason); + + if (clusterState.nodes().localNode().isMasterNode() == false && // master nodes keep the index meta data, even if having no shards.. + canDeleteIndexContents(shardId.index(), indexSettings)) { + if (nodeEnv.findAllShardIds(shardId.index()).isEmpty()) { + try { + // note that deleteIndexStore have more safety checks and may throw an exception if index was concurrently created. + deleteIndexStore("no longer used", metaData, clusterState); + } catch (Exception e) { + // wrap the exception to indicate we already deleted the shard + throw new ElasticsearchException("failed to delete unused index after deleting it's last shard (" + shardId + ")", e); + } + } else { + logger.trace("[{}] still has shard stores, leaving as is"); + } + } } /** diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index afe68559b22ce..cf9f617b0cbfa 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -22,7 +22,6 @@ import org.apache.lucene.store.StoreRateLimiting; import org.elasticsearch.Version; import org.elasticsearch.cluster.*; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -49,7 +48,6 @@ import org.elasticsearch.transport.*; import java.io.Closeable; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; @@ -297,28 +295,18 @@ private void allNodesResponded() { return; } - clusterService.submitStateUpdateTask("indices_store", new ClusterStateNonMasterUpdateTask() { + clusterService.submitStateUpdateTask("indices_store ([" + shardId + "] active fully on other nodes)", new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { if (clusterState.getVersion() != currentState.getVersion()) { logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterState.getVersion()); return currentState; } - IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex()); try { - indicesService.deleteShardStore("no longer used", shardId, indexMeta); + indicesService.deleteShardStore("no longer used", shardId, currentState); } catch (Throwable ex) { logger.debug("{} failed to delete unallocated shard, ignoring", ex, shardId); } - // if the index doesn't exists anymore, delete its store as well, but only if its a non master node, since master - // nodes keep the index metadata around - if (indicesService.hasIndex(shardId.getIndex()) == false && currentState.nodes().localNode().masterNode() == false) { - try { - indicesService.deleteIndexStore("no longer used", indexMeta, currentState); - } catch (Throwable ex) { - logger.debug("{} failed to delete unallocated index, ignoring", ex, shardId.getIndex()); - } - } return currentState; } diff --git a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java index 4a9fdf4448030..a33bbec9e1d97 100644 --- a/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java +++ b/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationTests.java @@ -22,6 +22,7 @@ import com.google.common.base.Predicate; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -31,6 +32,8 @@ import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.common.Priority; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.ImmutableSettings; @@ -40,12 +43,14 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.disruption.SingleNodeDisruption; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportRequestOptions; @@ -56,6 +61,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -214,6 +220,86 @@ public void shardsCleanup() throws Exception { assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false)); } + + @TestLogging("cluster.service:TRACE") + public void testShardActiveElsewhereDoesNotDeleteAnother() throws Exception { + Future masterFuture = internalCluster().startNodeAsync( + ImmutableSettings.builder().put(SETTINGS).put("node.master", true, "node.data", false).build()); + Future> nodesFutures = internalCluster().startNodesAsync(4, + ImmutableSettings.builder().put(SETTINGS).put("node.master", false, "node.data", true).build()); + + final String masterNode = masterFuture.get(); + final String node1 = nodesFutures.get().get(0); + final String node2 = nodesFutures.get().get(1); + final String node3 = nodesFutures.get().get(2); + // we will use this later on, handy to start now to make sure it has a different data folder that node 1,2 &3 + final String node4 = nodesFutures.get().get(3); + + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "_name", node4) + )); + assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).setWaitForGreenStatus().setWaitForNodes("5").get().isTimedOut()); + + // disable allocation to control the situation more easily + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "none"))); + + logger.debug("--> shutting down two random nodes"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1, node2, node3)); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1, node2, node3)); + + logger.debug("--> verifying index is red"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").get(); + if (health.getStatus() != ClusterHealthStatus.RED) { + logClusterState(); + fail("cluster didn't become red, despite of shutting 2 of 3 nodes"); + } + + logger.debug("--> allowing index to be assigned to node [{}]", node4); + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings( + ImmutableSettings.builder() + .put(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "_name", "NONE"))); + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "all"))); + + logger.debug("--> waiting for shards to recover on [{}]", node4); + // we have to do this in two steps as we now do async shard fetching before assigning, so the change to the + // allocation filtering may not have immediate effect + // TODO: we should add an easier to do this. It's too much of a song and dance.. + assertBusy(new Runnable() { + @Override + public void run() { + assertTrue(internalCluster().getInstance(IndicesService.class, node4).hasIndex("test")); + } + }); + + // wait for 4 active shards - we should have lost one shard + assertFalse(client().admin().cluster().prepareHealth().setWaitForActiveShards(4).get().isTimedOut()); + + // disable allocation again to control concurrency a bit and allow shard active to kick in before allocation + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "none"))); + + logger.debug("--> starting the two old nodes back"); + + internalCluster().startNodesAsync(2, + ImmutableSettings.builder().put(SETTINGS).put("node.master", false, "node.data", true).build()); + + assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("5").get().isTimedOut()); + + + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder() + .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "all"))); + + logger.debug("--> waiting for the lost shard to be recovered"); + + ensureGreen("test"); + + } + @Test public void testShardActiveElseWhere() throws Exception { boolean node1IsMasterEligible = randomBoolean();