Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -542,18 +542,38 @@ public void deleteShardStore(String reason, ShardLock lock, Settings indexSettin
* This method deletes the shard contents on disk for the given shard ID. This method will fail if the shard deleting
* is prevented by {@link #canDeleteShardContent(org.elasticsearch.index.shard.ShardId, org.elasticsearch.cluster.metadata.IndexMetaData)}
* of if the shards lock can not be acquired.
*
* On data nodes, if the deleted shard is the last shard folder in it's index, the method will attempt to remove the index folder as well.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"its"

* @param reason the reason for the shard deletion
* @param shardId the shards ID to delete
* @param metaData the shards index metadata. This is required to access the indexes settings etc.
* @param clusterState . This is required to access the indexes settings etc.
* @throws IOException if an IOException occurs
*/
public void deleteShardStore(String reason, ShardId shardId, IndexMetaData metaData) throws IOException {
public void deleteShardStore(String reason, ShardId shardId, ClusterState clusterState) throws IOException {
final IndexMetaData metaData = clusterState.getMetaData().indices().get(shardId.getIndex());

final Settings indexSettings = buildIndexSettings(metaData);
if (canDeleteShardContent(shardId, indexSettings) == false) {
throw new ElasticsearchIllegalStateException("Can't delete shard " + shardId);
}
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
logger.trace("{} deleting shard reason [{}]", shardId, reason);
logger.debug("{} deleted shard reason [{}]", shardId, reason);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd much rather have the logging prior to the deletion, so we can at least seen the shard id in logs if something goes awry during the deletion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is logging prior to deletion on the trace level inside deleteShardDirectorySafe(....). We can change it to debug, but I think it would be too much noise. Are you suggesting changing this back to trace and changing another on to debug?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh okay, if there's already one inside deleteShardDirectorySafe then leaving it as-is is fine with me :)


if (clusterState.nodes().localNode().isMasterNode() == false && // master nodes keep the index meta data, even if having no shards..
canDeleteIndexContents(shardId.index(), indexSettings)) {
if (nodeEnv.findAllShardIds(shardId.index()).isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should move this check to deleteIndexStore() that can then be enabled via a boolean option? Only when an index gets deleted we actually want a recursive delete of the index directory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead of we should add a recursive boolean parameter to deleteIndexStore() and only when we delete an index this should then be enabled. If this makes sense then I think we should do this change only in master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "only when we delete an index" you mean via the delete index api?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brwe yes, via the delete index api.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this is to make sure that data nodes that are not master eligible do not keep around metadata for indices if they have no shards allocated on them anymore. At least this is why I made #9985 which introduced the bug I think.

try {
// note that deleteIndexStore have more safety checks and may throw an exception if index was concurrently created.
deleteIndexStore("no longer used", metaData, clusterState);
} catch (Exception e) {
// wrap the exception to indicate we already deleted the shard
throw new ElasticsearchException("failed to delete unused index after deleting it's last shard (" + shardId + ")", e);
}
} else {
logger.trace("[{}] still has shard stores, leaving as is");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logging statement has nothing to fill the {}.

}
}
}

/**
Expand Down
16 changes: 2 additions & 14 deletions src/main/java/org/elasticsearch/indices/store/IndicesStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand All @@ -49,7 +48,6 @@
import org.elasticsearch.transport.*;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
Expand Down Expand Up @@ -297,28 +295,18 @@ private void allNodesResponded() {
return;
}

clusterService.submitStateUpdateTask("indices_store", new ClusterStateNonMasterUpdateTask() {
clusterService.submitStateUpdateTask("indices_store ([" + shardId + "] active fully on other nodes)", new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (clusterState.getVersion() != currentState.getVersion()) {
logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterState.getVersion());
return currentState;
}
IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex());
try {
indicesService.deleteShardStore("no longer used", shardId, indexMeta);
indicesService.deleteShardStore("no longer used", shardId, currentState);
} catch (Throwable ex) {
logger.debug("{} failed to delete unallocated shard, ignoring", ex, shardId);
}
// if the index doesn't exists anymore, delete its store as well, but only if its a non master node, since master
// nodes keep the index metadata around
if (indicesService.hasIndex(shardId.getIndex()) == false && currentState.nodes().localNode().masterNode() == false) {
try {
indicesService.deleteIndexStore("no longer used", indexMeta, currentState);
} catch (Throwable ex) {
logger.debug("{} failed to delete unallocated index, ignoring", ex, shardId.getIndex());
}
}
return currentState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -31,6 +32,8 @@
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ImmutableSettings;
Expand All @@ -40,12 +43,14 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportRequestOptions;
Expand All @@ -56,6 +61,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -214,6 +220,86 @@ public void shardsCleanup() throws Exception {
assertThat(waitForShardDeletion(node_4, "test", 0), equalTo(false));
}


@TestLogging("cluster.service:TRACE")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate with @Test also?

public void testShardActiveElsewhereDoesNotDeleteAnother() throws Exception {
Future<String> masterFuture = internalCluster().startNodeAsync(
ImmutableSettings.builder().put(SETTINGS).put("node.master", true, "node.data", false).build());
Future<List<String>> nodesFutures = internalCluster().startNodesAsync(4,
ImmutableSettings.builder().put(SETTINGS).put("node.master", false, "node.data", true).build());

final String masterNode = masterFuture.get();
final String node1 = nodesFutures.get().get(0);
final String node2 = nodesFutures.get().get(1);
final String node3 = nodesFutures.get().get(2);
// we will use this later on, handy to start now to make sure it has a different data folder that node 1,2 &3
final String node4 = nodesFutures.get().get(3);

assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "_name", node4)
));
assertFalse(client().admin().cluster().prepareHealth().setWaitForRelocatingShards(0).setWaitForGreenStatus().setWaitForNodes("5").get().isTimedOut());

// disable allocation to control the situation more easily
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "none")));

logger.debug("--> shutting down two random nodes");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1, node2, node3));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1, node2, node3));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can fail, because you could end up shutting down the same node twice (node1, then node1) I think. Or else we have a check somewhere that only shuts down active nodes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the list node1, node2, node3 works here as a filter not as a selector. So, I don't think it can select node1 twice since the second time node1 shouldn't be available for selection.


logger.debug("--> verifying index is red");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
if (health.getStatus() != ClusterHealthStatus.RED) {
logClusterState();
fail("cluster didn't become red, despite of shutting 2 of 3 nodes");
}

logger.debug("--> allowing index to be assigned to node [{}]", node4);
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(
ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "_name", "NONE")));

assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "all")));

logger.debug("--> waiting for shards to recover on [{}]", node4);
// we have to do this in two steps as we now do async shard fetching before assigning, so the change to the
// allocation filtering may not have immediate effect
// TODO: we should add an easier to do this. It's too much of a song and dance..
assertBusy(new Runnable() {
@Override
public void run() {
assertTrue(internalCluster().getInstance(IndicesService.class, node4).hasIndex("test"));
}
});

// wait for 4 active shards - we should have lost one shard
assertFalse(client().admin().cluster().prepareHealth().setWaitForActiveShards(4).get().isTimedOut());

// disable allocation again to control concurrency a bit and allow shard active to kick in before allocation
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "none")));

logger.debug("--> starting the two old nodes back");

internalCluster().startNodesAsync(2,
ImmutableSettings.builder().put(SETTINGS).put("node.master", false, "node.data", true).build());

assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("5").get().isTimedOut());


assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "all")));

logger.debug("--> waiting for the lost shard to be recovered");

ensureGreen("test");

}

@Test
public void testShardActiveElseWhere() throws Exception {
boolean node1IsMasterEligible = randomBoolean();
Expand Down