Skip to content

Commit 38bf2bd

Browse files
committed
Watcher: Mark watcher as started only after loading watches (#30403)
Starting watcher should wait for the watcher to be started before marking the status as started, which is now done via a callback. Also, reloading watcher could set the execution service to paused. This could lead to watches not being executed, when run in tests. This fix does not change the paused flag in the execution service, just clears out the current queue and executions. Closes #30381
1 parent 35c5eab commit 38bf2bd

File tree

6 files changed

+80
-23
lines changed

6 files changed

+80
-23
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,7 @@ public void clusterChanged(ClusterChangedEvent event) {
111111
// if this is not a data node, we need to start it ourselves possibly
112112
if (event.state().nodes().getLocalNode().isDataNode() == false &&
113113
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) {
114-
watcherService.start(event.state());
115-
this.state.set(WatcherState.STARTED);
114+
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
116115
return;
117116
}
118117

@@ -160,8 +159,8 @@ public void clusterChanged(ClusterChangedEvent event) {
160159
if (state.get() == WatcherState.STARTED) {
161160
watcherService.reload(event.state(), "new local watcher shard allocation ids");
162161
} else if (state.get() == WatcherState.STOPPED) {
163-
watcherService.start(event.state());
164-
this.state.set(WatcherState.STARTED);
162+
this.state.set(WatcherState.STARTING);
163+
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
165164
}
166165
} else {
167166
clearAllocationIds();
@@ -172,8 +171,8 @@ public void clusterChanged(ClusterChangedEvent event) {
172171
WatcherState watcherState = this.state.get();
173172
if (event.localNodeMaster()) {
174173
if (watcherState != WatcherState.STARTED && watcherState != WatcherState.STARTING) {
175-
watcherService.start(event.state());
176-
this.state.set(WatcherState.STARTED);
174+
this.state.set(WatcherState.STARTING);
175+
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
177176
}
178177
} else {
179178
if (watcherState == WatcherState.STARTED || watcherState == WatcherState.STARTING) {

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherService.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,23 +183,40 @@ void reload(ClusterState state, String reason) {
183183
// by checking the cluster state version before and after loading the watches we can potentially just exit without applying the
184184
// changes
185185
processedClusterStateVersion.set(state.getVersion());
186-
pauseExecution(reason);
187186
triggerService.pauseExecution();
187+
int cancelledTaskCount = executionService.clearExecutionsAndQueue();
188+
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
188189

189190
executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false),
190191
e -> logger.error("error reloading watcher", e)));
191192
}
192193

193-
public void start(ClusterState state) {
194+
/**
195+
* start the watcher service, load watches in the background
196+
*
197+
* @param state the current cluster state
198+
* @param postWatchesLoadedCallback the callback to be triggered, when watches where loaded successfully
199+
*/
200+
public void start(ClusterState state, Runnable postWatchesLoadedCallback) {
201+
executionService.unPause();
194202
processedClusterStateVersion.set(state.getVersion());
195-
executor.execute(wrapWatcherService(() -> reloadInner(state, "starting", true),
203+
executor.execute(wrapWatcherService(() -> {
204+
if (reloadInner(state, "starting", true)) {
205+
postWatchesLoadedCallback.run();
206+
}
207+
},
196208
e -> logger.error("error starting watcher", e)));
197209
}
198210

199211
/**
200-
* reload the watches and start scheduling them
212+
* reload watches and start scheduling them
213+
*
214+
* @param state the current cluster state
215+
* @param reason the reason for reloading, will be logged
216+
* @param loadTriggeredWatches should triggered watches be loaded in this run, not needed for reloading, only for starting
217+
* @return true if no other loading of a newer cluster state happened in parallel, false otherwise
201218
*/
202-
private synchronized void reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) {
219+
private synchronized boolean reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) {
203220
// exit early if another thread has come in between
204221
if (processedClusterStateVersion.get() != state.getVersion()) {
205222
logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress",
@@ -221,9 +238,11 @@ private synchronized void reloadInner(ClusterState state, String reason, boolean
221238
executionService.executeTriggeredWatches(triggeredWatches);
222239
}
223240
logger.debug("watch service has been reloaded, reason [{}]", reason);
241+
return true;
224242
} else {
225243
logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress",
226244
state.getVersion(), processedClusterStateVersion.get());
245+
return false;
227246
}
228247
}
229248

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,25 @@ public void unPause() {
121121
}
122122

123123
/**
124-
* Pause the execution of the watcher executor
124+
* Pause the execution of the watcher executor, and empty the state.
125+
* Pausing means, that no new watch executions will be done unless this pausing is explicitely unset.
126+
* This is important when watcher is stopped, so that scheduled watches do not accidentally get executed.
127+
* This should not be used when we need to reload watcher based on some cluster state changes, then just calling
128+
* {@link #clearExecutionsAndQueue()} is the way to go
129+
*
125130
* @return the number of tasks that have been removed
126131
*/
127132
public int pause() {
128133
paused.set(true);
134+
return clearExecutionsAndQueue();
135+
}
136+
137+
/**
138+
* Empty the currently queued tasks and wait for current executions to finish.
139+
*
140+
* @return the number of tasks that have been removed
141+
*/
142+
public int clearExecutionsAndQueue() {
129143
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
130144
this.clearExecutions();
131145
return cancelledTaskCount;

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public void testManualStartStop() {
181181
reset(watcherService);
182182
when(watcherService.validate(clusterState)).thenReturn(true);
183183
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, stoppedClusterState));
184-
verify(watcherService, times(1)).start(eq(clusterState));
184+
verify(watcherService, times(1)).start(eq(clusterState), anyObject());
185185

186186
// no change, keep going
187187
reset(watcherService);
@@ -457,7 +457,7 @@ public void testWatcherStartsOnlyOnMasterWhenOldNodesAreInCluster() throws Excep
457457

458458
// now start again
459459
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, watcherStoppedState));
460-
verify(watcherService).start(any(ClusterState.class));
460+
verify(watcherService).start(any(ClusterState.class), anyObject());
461461
}
462462

463463
public void testDistributedWatchExecutionDisabledWith5xNodesInCluster() throws Exception {
@@ -509,7 +509,7 @@ public void testWatcherServiceDoesNotStartIfIndexTemplatesAreMissing() throws Ex
509509
when(watcherService.validate(eq(state))).thenReturn(true);
510510

511511
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
512-
verify(watcherService, times(0)).start(any(ClusterState.class));
512+
verify(watcherService, times(0)).start(any(ClusterState.class), anyObject());
513513
}
514514

515515
public void testWatcherStopsWhenMasterNodeIsMissing() {

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import static org.mockito.Matchers.any;
6969
import static org.mockito.Matchers.eq;
7070
import static org.mockito.Mockito.mock;
71+
import static org.mockito.Mockito.never;
7172
import static org.mockito.Mockito.verify;
7273
import static org.mockito.Mockito.when;
7374

@@ -199,7 +200,7 @@ void stopExecutor() {
199200
when(client.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollFuture);
200201
clearScrollFuture.onResponse(new ClearScrollResponse(true, 1));
201202

202-
service.start(clusterState);
203+
service.start(clusterState, () -> {});
203204

204205
ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
205206
verify(triggerService).start(captor.capture());
@@ -238,6 +239,27 @@ void stopExecutor() {
238239
verify(triggerEngine).pauseExecution();
239240
}
240241

242+
// if we have to reload the watcher service, the execution service should not be paused, as this might
243+
// result in missing executions
244+
public void testReloadingWatcherDoesNotPauseExecutionService() {
245+
ExecutionService executionService = mock(ExecutionService.class);
246+
TriggerService triggerService = mock(TriggerService.class);
247+
WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class),
248+
executionService, mock(WatchParser.class), mock(Client.class), executorService) {
249+
@Override
250+
void stopExecutor() {
251+
}
252+
};
253+
254+
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
255+
csBuilder.metaData(MetaData.builder());
256+
257+
service.reload(csBuilder.build(), "whatever");
258+
verify(executionService).clearExecutionsAndQueue();
259+
verify(executionService, never()).pause();
260+
verify(triggerService).pauseExecution();
261+
}
262+
241263
private static DiscoveryNode newNode() {
242264
return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
243265
new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/ExecutionVarsIntegrationTests.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.function.Function;
2424

25+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
2526
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
2627
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
2728
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
@@ -35,6 +36,8 @@
3536

3637
public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTestCase {
3738

39+
private String watchId = randomAlphaOfLength(20);
40+
3841
@Override
3942
protected List<Class<? extends Plugin>> pluginTypes() {
4043
List<Class<? extends Plugin>> types = super.pluginTypes();
@@ -106,7 +109,7 @@ protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
106109
public void testVars() throws Exception {
107110
WatcherClient watcherClient = watcherClient();
108111

109-
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder()
112+
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch(watchId).setSource(watchBuilder()
110113
.trigger(schedule(cron("0/1 * * * * ?")))
111114
.input(simpleInput("value", 5))
112115
.condition(new ScriptCondition(
@@ -125,7 +128,7 @@ public void testVars() throws Exception {
125128

126129
assertThat(putWatchResponse.isCreated(), is(true));
127130

128-
timeWarp().trigger("_id");
131+
timeWarp().trigger(watchId);
129132

130133
flush();
131134
refresh();
@@ -134,11 +137,11 @@ public void testVars() throws Exception {
134137
// defaults to match all;
135138
});
136139

137-
assertThat(searchResponse.getHits().getTotalHits(), is(1L));
140+
assertHitCount(searchResponse, 1L);
138141

139142
Map<String, Object> source = searchResponse.getHits().getAt(0).getSourceAsMap();
140143

141-
assertValue(source, "watch_id", is("_id"));
144+
assertValue(source, "watch_id", is(watchId));
142145
assertValue(source, "state", is("executed"));
143146

144147
// we don't store the computed vars in history
@@ -170,7 +173,7 @@ public void testVars() throws Exception {
170173
public void testVarsManual() throws Exception {
171174
WatcherClient watcherClient = watcherClient();
172175

173-
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder()
176+
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch(watchId).setSource(watchBuilder()
174177
.trigger(schedule(cron("0/1 * * * * ? 2020")))
175178
.input(simpleInput("value", 5))
176179
.condition(new ScriptCondition(
@@ -192,13 +195,13 @@ public void testVarsManual() throws Exception {
192195
boolean debug = randomBoolean();
193196

194197
ExecuteWatchResponse executeWatchResponse = watcherClient
195-
.prepareExecuteWatch("_id")
198+
.prepareExecuteWatch(watchId)
196199
.setDebug(debug)
197200
.get();
198201
assertThat(executeWatchResponse.getRecordId(), notNullValue());
199202
XContentSource source = executeWatchResponse.getRecordSource();
200203

201-
assertValue(source, "watch_id", is("_id"));
204+
assertValue(source, "watch_id", is(watchId));
202205
assertValue(source, "state", is("executed"));
203206

204207
if (debug) {

0 commit comments

Comments
 (0)