From 926a671c6f96cc6d786b7da78093f53e75b92f07 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 2 Jul 2019 11:43:21 -0500 Subject: [PATCH 1/5] Ensure no Watches are running after Watcher is stopped. Watcher keeps track of which watches are currently running keyed by watcher name/id. If a watch is currently running it will not run the same watch and will result in a message : "Watch is already queued in thread pool" and a state: "not_executed_already_queued" When Watcher is stopped, it will stop watcher (rejecting any new watches), but allow the currently running watches to run to completion. Waiting for the currently running watches to complete is done async to the stopping of Watcher. Meaning that Watcher will report as fully stopped, but there is still a background thread waiting for all of the Watches to finish before it removes the watch from it's list of currently running Watches. The integration test start and stop watcher between each test. The goal to ensure a clean state between tests. However, since Watcher can report "yes - I am stopped", but there are still running Watches, the tests may bleed over into each other, especially on slow machines. This can result in errors related to "Watch is already queued in thread pool" and a state: "not_executed_already_queued", and is VERY difficult to reproduce. This commit changes the waiting for Watches on stop/pause from an aysnc waiting, back to a sync wait as it worked prior to #30118. This help ensure that for testing testing scenario the stop much more predictable, such that after fully stopped, no Watches are running. This should have little impact if any on production code since Watcher isn't stopped/paused too often and when it stop/pause it has the same behavior is the same, it will just run on the calling thread, not a generic thread. --- .../watcher/execution/ExecutionService.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index b1ba8c1522acf..2983f6faa1438 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -106,7 +106,7 @@ public class ExecutionService { private final WatchExecutor executor; private final ExecutorService genericExecutor; - private AtomicReference currentExecutions = new AtomicReference<>(); + private CurrentExecutions currentExecutions; private final AtomicBoolean paused = new AtomicBoolean(false); public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor, @@ -123,7 +123,7 @@ public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredW this.client = client; this.genericExecutor = genericExecutor; this.indexDefaultTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(30)); - this.currentExecutions.set(new CurrentExecutions()); + this.currentExecutions = new CurrentExecutions(); } public void unPause() { @@ -169,12 +169,12 @@ public long executionThreadPoolMaxSize() { // for testing only CurrentExecutions getCurrentExecutions() { - return currentExecutions.get(); + return currentExecutions; } public List currentExecutions() { List currentExecutions = new ArrayList<>(); - for (WatchExecution watchExecution : this.currentExecutions.get()) { + for (WatchExecution watchExecution : this.currentExecutions) { currentExecutions.add(watchExecution.createSnapshot()); } // Lets show the longest running watch first: @@ -279,7 +279,7 @@ public WatchRecord execute(WatchExecutionContext ctx) { WatchRecord record = null; final String watchId = ctx.id().watchId(); try { - boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread())); + boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread())); if (executionAlreadyExists) { logger.trace("not executing watch [{}] because it is already queued", watchId); record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool"); @@ -334,7 +334,7 @@ record = createWatchRecord(record, ctx, e); triggeredWatchStore.delete(ctx.id()); } - currentExecutions.get().remove(watchId); + currentExecutions.remove(watchId); logger.debug("finished [{}]/[{}]", watchId, ctx.id()); } return record; @@ -578,10 +578,9 @@ public Counters executionTimes() { * This clears out the current executions and sets new empty current executions * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea */ - private void clearExecutions() { - final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions()); - // clear old executions in background, no need to wait - genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout)); + private synchronized void clearExecutions() { + currentExecutions.sealAndAwaitEmpty(maxStopTimeout); + currentExecutions = new CurrentExecutions(); } // the watch execution task takes another runnable as parameter From 9d18274a2635f7b06ea10aea7f317941eca7ed9e Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 2 Jul 2019 11:59:38 -0500 Subject: [PATCH 2/5] fix check style --- .../elasticsearch/xpack/watcher/execution/ExecutionService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 2983f6faa1438..1d058dedc0c3f 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -78,7 +78,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; From ce5dd1b7e6faac93209ffdfc1857ed9db97a1525 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 2 Jul 2019 17:56:07 -0500 Subject: [PATCH 3/5] Revert "fix check style" This reverts commit 9d18274a2635f7b06ea10aea7f317941eca7ed9e. --- .../elasticsearch/xpack/watcher/execution/ExecutionService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 1d058dedc0c3f..2983f6faa1438 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -78,6 +78,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; From ec360daeaf6738d2b509920feea13c3b705b8ca3 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 2 Jul 2019 17:56:25 -0500 Subject: [PATCH 4/5] Revert "Ensure no Watches are running after Watcher is stopped." This reverts commit 926a671c6f96cc6d786b7da78093f53e75b92f07. --- .../watcher/execution/ExecutionService.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 2983f6faa1438..b1ba8c1522acf 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -106,7 +106,7 @@ public class ExecutionService { private final WatchExecutor executor; private final ExecutorService genericExecutor; - private CurrentExecutions currentExecutions; + private AtomicReference currentExecutions = new AtomicReference<>(); private final AtomicBoolean paused = new AtomicBoolean(false); public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredWatchStore triggeredWatchStore, WatchExecutor executor, @@ -123,7 +123,7 @@ public ExecutionService(Settings settings, HistoryStore historyStore, TriggeredW this.client = client; this.genericExecutor = genericExecutor; this.indexDefaultTimeout = settings.getAsTime("xpack.watcher.internal.ops.index.default_timeout", TimeValue.timeValueSeconds(30)); - this.currentExecutions = new CurrentExecutions(); + this.currentExecutions.set(new CurrentExecutions()); } public void unPause() { @@ -169,12 +169,12 @@ public long executionThreadPoolMaxSize() { // for testing only CurrentExecutions getCurrentExecutions() { - return currentExecutions; + return currentExecutions.get(); } public List currentExecutions() { List currentExecutions = new ArrayList<>(); - for (WatchExecution watchExecution : this.currentExecutions) { + for (WatchExecution watchExecution : this.currentExecutions.get()) { currentExecutions.add(watchExecution.createSnapshot()); } // Lets show the longest running watch first: @@ -279,7 +279,7 @@ public WatchRecord execute(WatchExecutionContext ctx) { WatchRecord record = null; final String watchId = ctx.id().watchId(); try { - boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread())); + boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread())); if (executionAlreadyExists) { logger.trace("not executing watch [{}] because it is already queued", watchId); record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool"); @@ -334,7 +334,7 @@ record = createWatchRecord(record, ctx, e); triggeredWatchStore.delete(ctx.id()); } - currentExecutions.remove(watchId); + currentExecutions.get().remove(watchId); logger.debug("finished [{}]/[{}]", watchId, ctx.id()); } return record; @@ -578,9 +578,10 @@ public Counters executionTimes() { * This clears out the current executions and sets new empty current executions * This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea */ - private synchronized void clearExecutions() { - currentExecutions.sealAndAwaitEmpty(maxStopTimeout); - currentExecutions = new CurrentExecutions(); + private void clearExecutions() { + final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions()); + // clear old executions in background, no need to wait + genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout)); } // the watch execution task takes another runnable as parameter From caa9ae909951752fa2e89807c93c8d252d3bc35a Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Tue, 2 Jul 2019 17:58:59 -0500 Subject: [PATCH 5/5] Ensure no Watches are running after Watcher is stopped. Watcher keeps track of which watches are currently running keyed by watcher name/id. If a watch is currently running it will not run the same watch and will result in a message : "Watch is already queued in thread pool" and a state: "not_executed_already_queued" When Watcher is stopped, it will stop watcher (rejecting any new watches), but allow the currently running watches to run to completion. Waiting for the currently running watches to complete is done async to the stopping of Watcher. Meaning that Watcher will report as fully stopped, but there is still a background thread waiting for all of the Watches to finish before it removes the watch from it's list of currently running Watches. The integration test start and stop watcher between each test. The goal to ensure a clean state between tests. However, since Watcher can report "yes - I am stopped", but there are still running Watches, the tests may bleed over into each other, especially on slow machines. This can result in errors related to "Watch is already queued in thread pool" and a state: "not_executed_already_queued", and is VERY difficult to reproduce. This commit changes the waiting for Watches on stop/pause from an aysnc waiting, back to a sync wait as it worked prior to #30118. This help ensure that for testing testing scenario the stop much more predictable, such that after fully stopped, no Watches are running. This should have little impact if any on production code since Watcher isn't stopped/paused too often and when it stop/pause it has the same behavior is the same, it will just run on the calling thread, not a generic thread. --- .../xpack/watcher/execution/ExecutionService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index b1ba8c1522acf..989712f68056b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -580,8 +580,7 @@ public Counters executionTimes() { */ private void clearExecutions() { final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions()); - // clear old executions in background, no need to wait - genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout)); + currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout); } // the watch execution task takes another runnable as parameter