diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index 620d575fc802c..fd46ce67bbe6a 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -11,7 +11,6 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -22,13 +21,16 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xpack.core.watcher.WatcherMetaData; import org.elasticsearch.xpack.core.watcher.WatcherState; import org.elasticsearch.xpack.core.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -45,7 +47,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope); private final AtomicReference state = new AtomicReference<>(WatcherState.STARTED); - private final AtomicReference> previousAllocationIds = new AtomicReference<>(Collections.emptyList()); + private final AtomicReference> previousShardRoutings = new AtomicReference<>(Collections.emptyList()); private final boolean requireManualStart; private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this. private volatile WatcherService watcherService; @@ -144,15 +146,20 @@ public void clusterChanged(ClusterChangedEvent event) { return; } - List currentAllocationIds = localShards.stream() - .map(ShardRouting::allocationId) - .map(AllocationId::getId) - .sorted() + // also check if non local shards have changed, as loosing a shard on a + // remote node or adding a replica on a remote node needs to trigger a reload too + Set localShardIds = localShards.stream().map(ShardRouting::shardId).collect(Collectors.toSet()); + List allShards = event.state().routingTable().index(watchIndex).shardsWithState(STARTED); + allShards.addAll(event.state().routingTable().index(watchIndex).shardsWithState(RELOCATING)); + List localAffectedShardRoutings = allShards.stream() + .filter(shardRouting -> localShardIds.contains(shardRouting.shardId())) + // shardrouting is not comparable, so we need some order mechanism + .sorted(Comparator.comparing(ShardRouting::hashCode)) .collect(Collectors.toList()); - if (previousAllocationIds.get().equals(currentAllocationIds) == false) { + if (previousShardRoutings.get().equals(localAffectedShardRoutings) == false) { if (watcherService.validate(event.state())) { - previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds)); + previousShardRoutings.set(localAffectedShardRoutings); if (state.get() == WatcherState.STARTED) { watcherService.reload(event.state(), "new local watcher shard allocation ids"); } else if (state.get() == WatcherState.STOPPED) { @@ -187,13 +194,13 @@ private boolean isWatcherStoppedManually(ClusterState state) { * @return true, if existing allocation ids were cleaned out, false otherwise */ private boolean clearAllocationIds() { - List previousIds = previousAllocationIds.getAndSet(Collections.emptyList()); - return previousIds.equals(Collections.emptyList()) == false; + List previousIds = previousShardRoutings.getAndSet(Collections.emptyList()); + return previousIds.isEmpty() == false; } // for testing purposes only - List allocationIds() { - return previousAllocationIds.get(); + List shardRoutings() { + return previousShardRoutings.get(); } public WatcherState getState() { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 700901753d4a1..384338af5a284 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -254,9 +254,12 @@ public void testReplicaWasAddedOrRemoved() { .add(newNode("node_2")) .build(); + ShardRouting firstShardOnSecondNode = TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED); + ShardRouting secondShardOnFirstNode = TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED); + IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex) - .addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED)) - .addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED)) + .addShard(secondShardOnFirstNode) + .addShard(firstShardOnSecondNode) .build(); IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX) @@ -273,10 +276,19 @@ public void testReplicaWasAddedOrRemoved() { .metaData(MetaData.builder().put(indexMetaData, false)) .build(); + // add a replica in the local node + boolean addShardOnLocalNode = randomBoolean(); + final ShardRouting addedShardRouting; + if (addShardOnLocalNode) { + addedShardRouting = TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED); + } else { + addedShardRouting = TestShardRouting.newShardRouting(secondShardId, "node_2", false, STARTED); + } + IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex) - .addShard(TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED)) - .addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED)) - .addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED)) + .addShard(secondShardOnFirstNode) + .addShard(firstShardOnSecondNode) + .addShard(addedShardRouting) .build(); ClusterState stateWithReplicaAdded = ClusterState.builder(new ClusterName("my-cluster")) @@ -477,7 +489,67 @@ public void testDataNodeWithoutDataCanStart() { assertThat(lifeCycleService.getState(), is(WatcherState.STARTED)); } - private ClusterState startWatcher() { + // this emulates a node outage somewhere in the cluster that carried a watcher shard + // the number of shards remains the same, but we need to ensure that watcher properly reloads + // previously we only checked the local shard allocations, but we also need to check if shards in the cluster have changed + public void testWatcherReloadsOnNodeOutageWithWatcherShard() { + Index watchIndex = new Index(Watch.INDEX, "foo"); + ShardId shardId = new ShardId(watchIndex, 0); + String localNodeId = randomFrom("node_1", "node_2"); + String outageNodeId = localNodeId.equals("node_1") ? "node_2" : "node_1"; + DiscoveryNodes previousDiscoveryNodes = new DiscoveryNodes.Builder().masterNodeId(localNodeId).localNodeId(localNodeId) + .add(newNode(localNodeId)) + .add(newNode(outageNodeId)) + .build(); + + ShardRouting replicaShardRouting = TestShardRouting.newShardRouting(shardId, localNodeId, false, STARTED); + ShardRouting primartShardRouting = TestShardRouting.newShardRouting(shardId, outageNodeId, true, STARTED); + IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex) + .addShard(replicaShardRouting) + .addShard(primartShardRouting) + .build(); + + IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX) + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6) + ).build(); + + ClusterState previousState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(previousDiscoveryNodes) + .routingTable(RoutingTable.builder().add(previousWatchRoutingTable).build()) + .metaData(MetaData.builder().put(indexMetaData, false)) + .build(); + + ShardRouting nowPrimaryShardRouting = replicaShardRouting.moveActiveReplicaToPrimary(); + IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex) + .addShard(nowPrimaryShardRouting) + .build(); + + DiscoveryNodes currentDiscoveryNodes = new DiscoveryNodes.Builder().masterNodeId(localNodeId).localNodeId(localNodeId) + .add(newNode(localNodeId)) + .build(); + + ClusterState currentState = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(currentDiscoveryNodes) + .routingTable(RoutingTable.builder().add(currentWatchRoutingTable).build()) + .metaData(MetaData.builder().put(indexMetaData, false)) + .build(); + + // initialize the previous state, so all the allocation ids are loaded + when(watcherService.validate(anyObject())).thenReturn(true); + lifeCycleService.clusterChanged(new ClusterChangedEvent("whatever", previousState, currentState)); + + reset(watcherService); + when(watcherService.validate(anyObject())).thenReturn(true); + ClusterChangedEvent event = new ClusterChangedEvent("whatever", currentState, previousState); + lifeCycleService.clusterChanged(event); + verify(watcherService).reload(eq(event.state()), anyString()); + } + + private void startWatcher() { Index index = new Index(Watch.INDEX, "uuid"); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); indexRoutingTableBuilder.addShard( @@ -506,12 +578,10 @@ private ClusterState startWatcher() { lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState)); assertThat(lifeCycleService.getState(), is(WatcherState.STARTED)); verify(watcherService, times(1)).reload(eq(state), anyString()); - assertThat(lifeCycleService.allocationIds(), hasSize(1)); + assertThat(lifeCycleService.shardRoutings(), hasSize(1)); // reset the mock, the user has to mock everything themselves again reset(watcherService); - - return state; } private List randomIndexPatterns() {