Skip to content

Commit 3e64d74

Browse files
committed
review comments
1 parent c4a123b commit 3e64d74

File tree

2 files changed

+58
-19
lines changed

2 files changed

+58
-19
lines changed

core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,11 @@ private static Map<String, Decision> buildNodeDecisions(NodesToAllocate nodesToA
257257
return nodeDecisions;
258258
}
259259

260+
private static final Comparator<NodeGatewayStartedShards> NO_STORE_EXCEPTION_FIRST_COMPARATOR =
261+
Comparator.comparing((NodeGatewayStartedShards state) -> state.storeException() == null).reversed();
262+
private static final Comparator<NodeGatewayStartedShards> PRIMARY_FIRST_COMPARATOR =
263+
Comparator.comparing(NodeGatewayStartedShards::primary).reversed();
264+
260265
/**
261266
* Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
262267
* inSyncAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
@@ -266,8 +271,7 @@ protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRo
266271
Set<String> ignoreNodes, Set<String> inSyncAllocationIds,
267272
FetchResult<NodeGatewayStartedShards> shardState,
268273
Logger logger) {
269-
LinkedList<NodeGatewayStartedShards> matchingNodeShardStates = new LinkedList<>();
270-
LinkedList<NodeGatewayStartedShards> nonMatchingNodeShardStates = new LinkedList<>();
274+
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
271275
int numberOfAllocationsFound = 0;
272276
for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
273277
DiscoveryNode node = nodeShardState.getNode();
@@ -297,28 +301,27 @@ protected static NodeShardsResult buildAllocationIdBasedNodeShardsResult(ShardRo
297301
}
298302

299303
if (allocationId != null) {
304+
assert nodeShardState.storeException() == null ||
305+
nodeShardState.storeException() instanceof ShardLockObtainFailedException :
306+
"only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a store throwing " + nodeShardState.storeException();
300307
numberOfAllocationsFound++;
301-
if (inSyncAllocationIds.contains(allocationId)) {
302-
// put shards that were primary before and that didn't throw a ShardLockObtainFailedException first
303-
if (nodeShardState.primary() && nodeShardState.storeException() == null) {
304-
matchingNodeShardStates.addFirst(nodeShardState);
305-
} else {
306-
matchingNodeShardStates.addLast(nodeShardState);
307-
}
308-
} else if (matchAnyShard) {
309-
// put shards that were primary before and that didn't throw a ShardLockObtainFailedException first
310-
if (nodeShardState.primary() && nodeShardState.storeException() == null) {
311-
nonMatchingNodeShardStates.addFirst(nodeShardState);
312-
} else {
313-
nonMatchingNodeShardStates.addLast(nodeShardState);
314-
}
308+
if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) {
309+
nodeShardStates.add(nodeShardState);
315310
}
316311
}
317312
}
318313

319-
List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();
320-
nodeShardStates.addAll(matchingNodeShardStates);
321-
nodeShardStates.addAll(nonMatchingNodeShardStates);
314+
final Comparator<NodeGatewayStartedShards> comparator; // allocation preference
315+
if (matchAnyShard) {
316+
// prefer shards with matching allocation ids
317+
Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing(
318+
(NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())).reversed();
319+
comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR).thenComparing(PRIMARY_FIRST_COMPARATOR);
320+
} else {
321+
comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR);
322+
}
323+
324+
nodeShardStates.sort(comparator);
322325

323326
if (logger.isTraceEnabled()) {
324327
logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")));

core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,42 @@ public void testShardLockObtainFailedException() {
202202
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
203203
}
204204

205+
/**
206+
* Tests that when one node returns a ShardLockObtainFailedException and another properly loads the store, it will
207+
* select the second node as target
208+
*/
209+
public void testShardLockObtainFailedExceptionPreferOtherValidCopies() {
210+
final RoutingAllocation allocation;
211+
boolean useAllocationIds = randomBoolean();
212+
String allocId1 = randomAsciiOfLength(10);
213+
String allocId2 = randomAsciiOfLength(10);
214+
if (useAllocationIds) {
215+
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED,
216+
randomFrom(Version.V_2_0_0, Version.CURRENT), allocId1, allocId2);
217+
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, allocId1, randomBoolean(),
218+
new ShardLockObtainFailedException(shardId, "test"));
219+
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, allocId2, randomBoolean(), null);
220+
} else {
221+
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, Version.V_2_1_1);
222+
testAllocator.addData(node1, 3, null, randomBoolean(), new ShardLockObtainFailedException(shardId, "test"));
223+
if (randomBoolean()) {
224+
testAllocator.addData(node2, randomIntBetween(2, 4), null, randomBoolean(), null);
225+
} else {
226+
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some alloc id", randomBoolean(), null);
227+
}
228+
}
229+
testAllocator.allocateUnassigned(allocation);
230+
assertThat(allocation.routingNodesChanged(), equalTo(true));
231+
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
232+
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
233+
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.getId()));
234+
if (useAllocationIds) {
235+
// check that allocation id is reused
236+
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo(allocId2));
237+
}
238+
assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW);
239+
}
240+
205241
/**
206242
* Tests that when there is a node to allocate the shard to, it will be allocated to it.
207243
*/

0 commit comments

Comments
 (0)