Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.gateway.AsyncShardFetch.FetchResult;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.elasticsearch.index.shard.ShardStateMetaData;
Expand Down Expand Up @@ -256,6 +257,11 @@ private static Map<String, Decision> buildNodeDecisions(NodesToAllocate nodesToA
return nodeDecisions;
}

private static final Comparator<NodeGatewayStartedShards> NO_STORE_EXCEPTION_FIRST_COMPARATOR =
Comparator.comparing((NodeGatewayStartedShards state) -> state.storeException() == null).reversed();
private static final Comparator<NodeGatewayStartedShards> PRIMARY_FIRST_COMPARATOR =
Comparator.comparing(NodeGatewayStartedShards::primary).reversed();

/**
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
* inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
Expand All @@ -265,8 +271,7 @@ protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRo
Set<String> ignoreNodes, Set<String> inSyncAllocationIds,
FetchResult<NodeGatewayStartedShards> shardState,
Logger logger) {
LinkedList<NodeGatewayStartedShards> matchingNodeShardStates = new LinkedList<>();
LinkedList<NodeGatewayStartedShards> nonMatchingNodeShardStates = new LinkedList<>();
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
int numberOfAllocationsFound = 0;
for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
DiscoveryNode node = nodeShardState.getNode();
Expand All @@ -287,31 +292,36 @@ protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRo
}
} else {
final String finalAllocationId = allocationId;
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException());
allocationId = null;
if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException());
} else {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException());
allocationId = null;
}
}

if (allocationId != null) {
assert nodeShardState.storeException() == null ||
nodeShardState.storeException() instanceof ShardLockObtainFailedException :
"only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a store throwing " + nodeShardState.storeException();
numberOfAllocationsFound++;
if (inSyncAllocationIds.contains(allocationId)) {
if (nodeShardState.primary()) {
matchingNodeShardStates.addFirst(nodeShardState);
} else {
matchingNodeShardStates.addLast(nodeShardState);
}
} else if (matchAnyShard) {
if (nodeShardState.primary()) {
nonMatchingNodeShardStates.addFirst(nodeShardState);
} else {
nonMatchingNodeShardStates.addLast(nodeShardState);
}
if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) {
nodeShardStates.add(nodeShardState);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay!


List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
nodeShardStates.addAll(matchingNodeShardStates);
nodeShardStates.addAll(nonMatchingNodeShardStates);
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
if (matchAnyShard) {
// prefer shards with matching allocation ids
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())).reversed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fancy pants.

comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR).thenComparing(PRIMARY_FIRST_COMPARATOR);
} else {
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR);
}

nodeShardStates.sort(comparator);

if (logger.isTraceEnabled()) {
logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")));
Expand Down Expand Up @@ -412,10 +422,19 @@ static NodeShardsResult buildVersionBasedNodeShardsResult(ShardRouting shard, bo
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), nodeShardState.allocationId());
}
} else {
final long finalVerison = version;
// when there is an store exception, we disregard the reported version and assign it as no version (same as shard does not exist)
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVerison), nodeShardState.storeException());
version = ShardStateMetaData.NO_VERSION;
final long finalVersion = version;
if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException());
if (nodeShardState.allocationId() != null) {
version = Long.MAX_VALUE; // shard was already selected in a 5.x cluster as primary, prefer this shard copy again.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you run into this being a problem? how can we open an index that was created before 5.0.0 and never had insync replicas but does have allocationId? the only thing I can think of is a node network issue during shard initialization. I'm wondering if we need to optimize for this and no keep this code simple (i.e., demote shards with a lock exception)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We special-case this a few lines up as well (but it's not easy to do code reuse across those lines). For symmetry reasons I have kept it as is. The code is documented as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

} else {
version = 0L; // treat as lowest version so that this shard is the least likely to be selected as primary
}
} else {
// disregard the reported version and assign it as no version (same as shard does not exist)
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException());
version = ShardStateMetaData.NO_VERSION;
}
}

if (version != ShardStateMetaData.NO_VERSION) {
Expand Down
5 changes: 1 addition & 4 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,15 +414,12 @@ public static boolean canOpenIndex(Logger logger, Path indexLocation, ShardId sh
* segment infos and possible corruption markers. If the index can not
* be opened, an exception is thrown
*/
public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, Logger logger) throws IOException {
public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, Logger logger) throws IOException, ShardLockObtainFailedException {
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
} catch (ShardLockObtainFailedException ex) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("{} unable to acquire shard lock", shardId), ex);
throw new IOException(ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardStateMetaData;
import org.elasticsearch.snapshots.Snapshot;
Expand Down Expand Up @@ -174,6 +175,69 @@ public void testStoreException() {
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}

/**
* Tests that when the node returns a ShardLockObtainFailedException, it will be considered as a valid shard copy
*/
public void testShardLockObtainFailedException() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add tests for cases with more shards? for example the case where we "prefer" other shard copies with this exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've added more tests

final RoutingAllocation allocation;
boolean useAllocationIds = randomBoolean();
if (useAllocationIds) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(),
new ShardLockObtainFailedException(shardId, "test"));
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test"));
}
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
if (useAllocationIds) {
// check that allocation id is reused
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1"));
}
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}

/**
* Tests that when one node returns a ShardLockObtainFailedException and another properly loads the store, it will
* select the second node as target
*/
public void testShardLockObtainFailedExceptionPreferOtherValidCopies() {
final RoutingAllocation allocation;
boolean useAllocationIds = randomBoolean();
String allocId1 = randomAsciiOfLength(10);
String allocId2 = randomAsciiOfLength(10);
if (useAllocationIds) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
randomFrom(Version.V_2_0_0, Version.CURRENT), allocId1, allocId2);
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, allocId1, randomBoolean(),
new ShardLockObtainFailedException(shardId, "test"));
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, allocId2, randomBoolean(), null);
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test"));
if (randomBoolean()) {
testAllocator.addData(node2, randomIntBetween(2, 4), null, randomBoolean(), null);
} else {
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some alloc id", randomBoolean(), null);
}
}
testAllocator.allocateUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
if (useAllocationIds) {
// check that allocation id is reused
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId2));
}
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
}

/**
* Tests that when there is a node to allocate the shard to, it will be allocated to it.
*/
Expand Down