Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo
}
}

private synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
// package-private for testing
synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) {
failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state());
}

Expand All @@ -726,7 +727,10 @@ private void failAndRemoveShard(ShardRouting shardRouting, boolean sendShardFail
try {
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardRouting.shardId().getIndex());
if (indexService != null) {
indexService.removeShard(shardRouting.shardId().id(), message);
Shard shard = indexService.getShardOrNull(shardRouting.shardId().id());
if (shard != null && shard.routingEntry().isSameAllocation(shardRouting)) {
indexService.removeShard(shardRouting.shardId().id(), message);
}
}
} catch (ShardNotFoundException e) {
// the node got closed on us, ignore it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,40 @@ public void testInitializingPrimaryRemovesInitializingReplicaWithSameAID() {

}

public void testRecoveryFailures() {
disableRandomFailures();
String index = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT);
ClusterState state = ClusterStateCreationUtils.state(index, randomBoolean(),
ShardRoutingState.STARTED, ShardRoutingState.INITIALIZING);

// the initial state which is derived from the newly created cluster state but doesn't contain the index
ClusterState previousState = ClusterState.builder(state)
.metaData(MetaData.builder(state.metaData()).remove(index))
.routingTable(RoutingTable.builder().build())
.build();

// pick a data node to simulate the adding an index cluster state change event on, that has shards assigned to it
final ShardRouting shardRouting = state.routingTable().index(index).shard(0).replicaShards().get(0);
final ShardId shardId = shardRouting.shardId();
DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId());

// simulate the cluster state change on the node
ClusterState localState = adaptClusterStateToLocalNode(state, node);
ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node);
IndicesClusterStateService indicesCSSvc = createIndicesClusterStateService(node, RecordingIndicesService::new);
indicesCSSvc.start();
indicesCSSvc.applyClusterState(new ClusterChangedEvent("cluster state change that adds the index", localState, previousLocalState));

assertNotNull(indicesCSSvc.indicesService.getShardOrNull(shardId));

// check that failing unrelated allocation does not remove shard
indicesCSSvc.handleRecoveryFailure(shardRouting.reinitializeReplicaShard(), false, new Exception("dummy"));
assertNotNull(indicesCSSvc.indicesService.getShardOrNull(shardId));

indicesCSSvc.handleRecoveryFailure(shardRouting, false, new Exception("dummy"));
assertNull(indicesCSSvc.indicesService.getShardOrNull(shardId));
}

public ClusterState randomInitialClusterState(Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap,
Supplier<MockIndicesService> indicesServiceSupplier) {
List<DiscoveryNode> allNodes = new ArrayList<>();
Expand Down