Skip to content

Commit 236f6e8

Browse files
committed
Allow master to assign primary shard to node that has shard store locked during shard state fetching (#21656)
PR #19416 added a safety mechanism to shard state fetching to only access the store when the shard lock can be acquired. This can lead to the following situation however where a shard has not fully shut down yet while the shard fetching is going on, resulting in a ShardLockObtainFailedException. PrimaryShardAllocator that decides where to allocate primary shards sees this exception and treats the shard as unusable. If this is the only shard copy in the cluster, the cluster stays red and a new shard fetching cycle will not be triggered as shard state fetching treats exceptions while opening the store as permanent failures. This commit makes it so that PrimaryShardAllocator treats the locked shard as a possible allocation target (although with the least priority).
1 parent 501745a commit 236f6e8

File tree

3 files changed

+107
-27
lines changed

3 files changed

+107
-27
lines changed

core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.settings.Setting;
3939
import org.elasticsearch.common.settings.Setting.Property;
4040
import org.elasticsearch.common.settings.Settings;
41+
import org.elasticsearch.env.ShardLockObtainFailedException;
4142
import org.elasticsearch.gateway.AsyncShardFetch.FetchResult;
4243
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
4344
import org.elasticsearch.index.shard.ShardStateMetaData;
@@ -256,6 +257,11 @@ private static Map<String, Decision> buildNodeDecisions(NodesToAllocate nodesToA
256257
return nodeDecisions;
257258
}
258259

260+
private static final Comparator<NodeGatewayStartedShards> NO_STORE_EXCEPTION_FIRST_COMPARATOR =
261+
Comparator.comparing((NodeGatewayStartedShards state) -> state.storeException() == null).reversed();
262+
private static final Comparator<NodeGatewayStartedShards> PRIMARY_FIRST_COMPARATOR =
263+
Comparator.comparing(NodeGatewayStartedShards::primary).reversed();
264+
259265
/**
260266
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
261267
* inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
@@ -265,8 +271,7 @@ protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRo
265271
Set<String> ignoreNodes, Set<String> inSyncAllocationIds,
266272
FetchResult<NodeGatewayStartedShards> shardState,
267273
Logger logger) {
268-
LinkedList<NodeGatewayStartedShards> matchingNodeShardStates = new LinkedList<>();
269-
LinkedList<NodeGatewayStartedShards> nonMatchingNodeShardStates = new LinkedList<>();
274+
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
270275
int numberOfAllocationsFound = 0;
271276
for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
272277
DiscoveryNode node = nodeShardState.getNode();
@@ -287,31 +292,36 @@ protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRo
287292
}
288293
} else {
289294
final String finalAllocationId = allocationId;
290-
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException());
291-
allocationId = null;
295+
if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
296+
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException());
297+
} else {
298+
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId), nodeShardState.storeException());
299+
allocationId = null;
300+
}
292301
}
293302

294303
if (allocationId != null) {
304+
assert nodeShardState.storeException() == null ||
305+
nodeShardState.storeException() instanceof ShardLockObtainFailedException :
306+
"only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a store throwing " + nodeShardState.storeException();
295307
numberOfAllocationsFound++;
296-
if (inSyncAllocationIds.contains(allocationId)) {
297-
if (nodeShardState.primary()) {
298-
matchingNodeShardStates.addFirst(nodeShardState);
299-
} else {
300-
matchingNodeShardStates.addLast(nodeShardState);
301-
}
302-
} else if (matchAnyShard) {
303-
if (nodeShardState.primary()) {
304-
nonMatchingNodeShardStates.addFirst(nodeShardState);
305-
} else {
306-
nonMatchingNodeShardStates.addLast(nodeShardState);
307-
}
308+
if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) {
309+
nodeShardStates.add(nodeShardState);
308310
}
309311
}
310312
}
311313

312-
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
313-
nodeShardStates.addAll(matchingNodeShardStates);
314-
nodeShardStates.addAll(nonMatchingNodeShardStates);
314+
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
315+
if (matchAnyShard) {
316+
// prefer shards with matching allocation ids
317+
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
318+
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())).reversed();
319+
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR).thenComparing(PRIMARY_FIRST_COMPARATOR);
320+
} else {
321+
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR);
322+
}
323+
324+
nodeShardStates.sort(comparator);
315325

316326
if (logger.isTraceEnabled()) {
317327
logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")));
@@ -412,10 +422,19 @@ static NodeShardsResult buildVersionBasedNodeShardsResult(ShardRouting shard, bo
412422
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), nodeShardState.allocationId());
413423
}
414424
} else {
415-
final long finalVerison = version;
416-
// when there is an store exception, we disregard the reported version and assign it as no version (same as shard does not exist)
417-
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVerison), nodeShardState.storeException());
418-
version = ShardStateMetaData.NO_VERSION;
425+
final long finalVersion = version;
426+
if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
427+
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException());
428+
if (nodeShardState.allocationId() != null) {
429+
version = Long.MAX_VALUE; // shard was already selected in a 5.x cluster as primary, prefer this shard copy again.
430+
} else {
431+
version = 0L; // treat as lowest version so that this shard is the least likely to be selected as primary
432+
}
433+
} else {
434+
// disregard the reported version and assign it as no version (same as shard does not exist)
435+
logger.trace((Supplier<?>) () -> new ParameterizedMessage("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", shard, nodeShardState.getNode(), finalVersion), nodeShardState.storeException());
436+
version = ShardStateMetaData.NO_VERSION;
437+
}
419438
}
420439

421440
if (version != ShardStateMetaData.NO_VERSION) {

core/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -414,15 +414,12 @@ public static boolean canOpenIndex(Logger logger, Path indexLocation, ShardId sh
414414
* segment infos and possible corruption markers. If the index can not
415415
* be opened, an exception is thrown
416416
*/
417-
public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, Logger logger) throws IOException {
417+
public static void tryOpenIndex(Path indexLocation, ShardId shardId, NodeEnvironment.ShardLocker shardLocker, Logger logger) throws IOException, ShardLockObtainFailedException {
418418
try (ShardLock lock = shardLocker.lock(shardId, TimeUnit.SECONDS.toMillis(5));
419419
Directory dir = new SimpleFSDirectory(indexLocation)) {
420420
failIfCorrupted(dir, shardId);
421421
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
422422
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
423-
} catch (ShardLockObtainFailedException ex) {
424-
logger.error((Supplier<?>) () -> new ParameterizedMessage("{} unable to acquire shard lock", shardId), ex);
425-
throw new IOException(ex);
426423
}
427424
}
428425

core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.common.UUIDs;
4747
import org.elasticsearch.common.settings.Settings;
4848
import org.elasticsearch.common.util.set.Sets;
49+
import org.elasticsearch.env.ShardLockObtainFailedException;
4950
import org.elasticsearch.index.shard.ShardId;
5051
import org.elasticsearch.index.shard.ShardStateMetaData;
5152
import org.elasticsearch.snapshots.Snapshot;
@@ -176,6 +177,69 @@ public void testStoreException() {
176177
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
177178
}
178179

180+
/**
181+
* Tests that when the node returns a ShardLockObtainFailedException, it will be considered as a valid shard copy
182+
*/
183+
public void testShardLockObtainFailedException() {
184+
final RoutingAllocation allocation;
185+
boolean useAllocationIds = randomBoolean();
186+
if (useAllocationIds) {
187+
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
188+
randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
189+
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(),
190+
new ShardLockObtainFailedException(shardId, "test"));
191+
} else {
192+
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
193+
testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test"));
194+
}
195+
testAllocator.allocateUnassigned(allocation);
196+
assertThat(allocation.routingNodesChanged(), equalTo(true));
197+
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
198+
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
199+
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.getId()));
200+
if (useAllocationIds) {
201+
// check that allocation id is reused
202+
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("allocId1"));
203+
}
204+
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
205+
}
206+
207+
/**
208+
* Tests that when one node returns a ShardLockObtainFailedException and another properly loads the store, it will
209+
* select the second node as target
210+
*/
211+
public void testShardLockObtainFailedExceptionPreferOtherValidCopies() {
212+
final RoutingAllocation allocation;
213+
boolean useAllocationIds = randomBoolean();
214+
String allocId1 = randomAsciiOfLength(10);
215+
String allocId2 = randomAsciiOfLength(10);
216+
if (useAllocationIds) {
217+
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
218+
randomFrom(Version.V_2_0_0, Version.CURRENT), allocId1, allocId2);
219+
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, allocId1, randomBoolean(),
220+
new ShardLockObtainFailedException(shardId, "test"));
221+
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, allocId2, randomBoolean(), null);
222+
} else {
223+
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
224+
testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test"));
225+
if (randomBoolean()) {
226+
testAllocator.addData(node2, randomIntBetween(2, 4), null, randomBoolean(), null);
227+
} else {
228+
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some alloc id", randomBoolean(), null);
229+
}
230+
}
231+
testAllocator.allocateUnassigned(allocation);
232+
assertThat(allocation.routingNodesChanged(), equalTo(true));
233+
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
234+
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
235+
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
236+
if (useAllocationIds) {
237+
// check that allocation id is reused
238+
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId2));
239+
}
240+
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
241+
}
242+
179243
/**
180244
* Tests that when there is a node to allocate the shard to, it will be allocated to it.
181245
*/

0 commit comments

Comments
 (0)