Skip to content

Commit 85654b4

Browse files
authored
Use any index specified by .watches for Watcher (#39541) (#39706)
* Use any index specified by .watches for Watcher (#39541) Previously, Watcher only attached its listener to indices that started with the prefix `.watches`, which causes Watcher to silently fail to schedule newly created Watches if the `.watches` alias is redirected to an index that does not start with `.watches`. Watcher now attaches the listener to all indices, so that Watcher can respond to changes in which index has the `.watches` alias. Also adjusts the tests to randomly use non-prefixed concrete indices for .watches and .triggered_watches.
1 parent 5e6953a commit 85654b4

File tree

3 files changed

+111
-12
lines changed

3 files changed

+111
-12
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -608,11 +608,9 @@ public void onIndexModule(IndexModule module) {
608608
}
609609

610610
assert listener != null;
611-
// for now, we only add this index operation listener to indices starting with .watches
612-
// this also means, that aliases pointing to this index have to follow this notation
613-
if (module.getIndex().getName().startsWith(Watch.INDEX)) {
614-
module.addIndexOperationListener(listener);
615-
}
611+
// Attach a listener to every index so that we can react to alias changes.
612+
// This listener will be a no-op except on the index pointed to by .watches
613+
module.addIndexOperationListener(listener);
616614
}
617615

618616
static void validAutoCreateIndex(Settings settings, Logger logger) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.watcher;
8+
9+
import org.elasticsearch.action.search.SearchResponse;
10+
import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse;
11+
import org.elasticsearch.xpack.core.watcher.watch.Watch;
12+
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
13+
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
14+
15+
import java.util.Locale;
16+
17+
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
18+
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
19+
import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput;
20+
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
21+
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
22+
import static org.hamcrest.Matchers.greaterThan;
23+
24+
public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase {
25+
26+
@Override
27+
protected boolean timeWarped() {
28+
return false;
29+
}
30+
31+
public void testCanUseAnyConcreteIndexName() throws Exception {
32+
String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
33+
String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT);
34+
createIndex(watchResultsIndex);
35+
36+
stopWatcher();
37+
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName, Watch.DOC_TYPE);
38+
startWatcher();
39+
40+
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()
41+
.trigger(schedule(interval("3s")))
42+
.input(noneInput())
43+
.condition(InternalAlwaysCondition.INSTANCE)
44+
.addAction("indexer", indexAction(watchResultsIndex, "_doc")))
45+
.get();
46+
47+
assertTrue(putWatchResponse.isCreated());
48+
49+
assertBusy(() -> {
50+
SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get();
51+
assertThat((int) searchResult.getHits().getTotalHits(), greaterThan(0));
52+
});
53+
}
54+
}

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@
77

88
import org.apache.logging.log4j.Logger;
99
import org.elasticsearch.action.admin.indices.alias.Alias;
10+
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
1011
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
12+
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
1113
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
1214
import org.elasticsearch.action.search.SearchRequestBuilder;
1315
import org.elasticsearch.action.search.SearchResponse;
1416
import org.elasticsearch.action.support.IndicesOptions;
1517
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
1618
import org.elasticsearch.cluster.ClusterState;
1719
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
20+
import org.elasticsearch.cluster.metadata.MappingMetaData;
1821
import org.elasticsearch.cluster.routing.IndexRoutingTable;
1922
import org.elasticsearch.common.collect.Tuple;
2023
import org.elasticsearch.common.network.NetworkModule;
@@ -192,7 +195,7 @@ public void _setup() throws Exception {
192195
internalCluster().setDisruptionScheme(ice);
193196
ice.startDisrupting();
194197
}
195-
198+
stopWatcher();
196199
createWatcherIndicesOrAliases();
197200
startWatcher();
198201
}
@@ -219,13 +222,19 @@ private void createWatcherIndicesOrAliases() throws Exception {
219222
// alias for .watches, setting the index template to the same as well
220223
String watchIndexName;
221224
String triggeredWatchIndexName;
222-
if (rarely()) {
223-
watchIndexName = ".watches-alias-index";
224-
CreateIndexResponse response = client().admin().indices().prepareCreate(watchIndexName)
225+
if (randomBoolean()) {
226+
// Create an index to get the template
227+
String tempIndex = ".watches" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
228+
CreateIndexResponse response = client().admin().indices().prepareCreate(tempIndex)
225229
.setCause("Index to test aliases with .watches index")
226230
.addAlias(new Alias(Watch.INDEX))
227231
.get();
228232
assertAcked(response);
233+
234+
// Now replace it with a randomly named index
235+
watchIndexName = randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT);
236+
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, watchIndexName, Watch.DOC_TYPE);
237+
229238
logger.info("set alias for .watches index to [{}]", watchIndexName);
230239
} else {
231240
watchIndexName = Watch.INDEX;
@@ -237,13 +246,19 @@ private void createWatcherIndicesOrAliases() throws Exception {
237246
}
238247

239248
// alias for .triggered-watches, ensuring the index template is set appropriately
240-
if (rarely()) {
241-
triggeredWatchIndexName = ".triggered_watches-alias-index";
242-
CreateIndexResponse response = client().admin().indices().prepareCreate(triggeredWatchIndexName)
249+
if (randomBoolean()) {
250+
String tempIndex = ".triggered_watches-alias-index";
251+
CreateIndexResponse response = client().admin().indices().prepareCreate(tempIndex)
243252
.setCause("Index to test aliases with .triggered-watches index")
244253
.addAlias(new Alias(TriggeredWatchStoreField.INDEX_NAME))
245254
.get();
246255
assertAcked(response);
256+
257+
// Now replace it with a randomly-named index
258+
triggeredWatchIndexName = randomValueOtherThan(watchIndexName,
259+
() -> randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT));
260+
replaceWatcherIndexWithRandomlyNamedIndex(TriggeredWatchStoreField.INDEX_NAME, triggeredWatchIndexName,
261+
TriggeredWatchStoreField.DOC_TYPE);
247262
logger.info("set alias for .triggered-watches index to [{}]", triggeredWatchIndexName);
248263
} else {
249264
triggeredWatchIndexName = TriggeredWatchStoreField.INDEX_NAME;
@@ -257,6 +272,38 @@ private void createWatcherIndicesOrAliases() throws Exception {
257272
}
258273
}
259274

275+
public void replaceWatcherIndexWithRandomlyNamedIndex(String originalIndexOrAlias, String to, String docType) {
276+
GetIndexResponse index = client().admin().indices().prepareGetIndex().setIndices(originalIndexOrAlias).get();
277+
MappingMetaData mapping = index.getMappings().get(index.getIndices()[0]).get(docType);
278+
279+
Settings settings = index.getSettings().get(index.getIndices()[0]);
280+
Settings.Builder newSettings = Settings.builder().put(settings);
281+
newSettings.remove("index.provided_name");
282+
newSettings.remove("index.uuid");
283+
newSettings.remove("index.creation_date");
284+
newSettings.remove("index.version.created");
285+
286+
CreateIndexResponse createIndexResponse = client().admin().indices().prepareCreate(to)
287+
.addMapping(docType, mapping.sourceAsMap())
288+
.setSettings(newSettings)
289+
.get();
290+
assertTrue(createIndexResponse.isAcknowledged());
291+
ensureGreen(to);
292+
293+
AtomicReference<String> originalIndex = new AtomicReference<>(originalIndexOrAlias);
294+
boolean watchesIsAlias = client().admin().indices().prepareAliasesExist(originalIndexOrAlias).get().isExists();
295+
if (watchesIsAlias) {
296+
GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases(originalIndexOrAlias).get();
297+
assertEquals(1, aliasesResponse.getAliases().size());
298+
aliasesResponse.getAliases().forEach((aliasRecord) -> {
299+
assertEquals(1, aliasRecord.value.size());
300+
originalIndex.set(aliasRecord.key);
301+
});
302+
}
303+
client().admin().indices().prepareDelete(originalIndex.get()).get();
304+
client().admin().indices().prepareAliases().addAlias(to, originalIndexOrAlias).get();
305+
}
306+
260307
protected TimeWarp timeWarp() {
261308
assert timeWarped() : "cannot access TimeWarp when test context is not time warped";
262309
return timeWarp;

0 commit comments

Comments
 (0)