From 4900c89fd841d34c4a40cb26064083a9dd7fd0ef Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Fri, 19 Apr 2019 16:24:30 -0500 Subject: [PATCH 1/7] Fix Watcher deadlock that can cause in-abilty to index documents. This commit removes the usage of the `BulkProcessor` to write history documents and delete triggered watches on a `EsRejectedExecutionException`. Since the exception could be handled on the write thread, the write thread can be blocked waiting on watcher threads (due to a synchronous method). This is problematic since those watcher threads can be blocked waiting on write threads. This commit also moves the handling of the exception to the generic threadpool to avoid submitting write requests from the write thread pool. fixes #41390 --- .../watcher/execution/ExecutionService.java | 87 ++++++++++++++++--- .../execution/ExecutionServiceTests.java | 75 ++++++++++++++-- 2 files changed, 142 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 9b730db7ec59b..6bcbe11a15ac6 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -15,8 +15,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; @@ -32,19 +34,25 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper; import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult; import org.elasticsearch.xpack.core.watcher.common.stats.Counters; import org.elasticsearch.xpack.core.watcher.condition.Condition; import org.elasticsearch.xpack.core.watcher.execution.ExecutionState; import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch; +import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; +import org.elasticsearch.xpack.core.watcher.execution.Wid; +import org.elasticsearch.xpack.core.watcher.history.HistoryStoreField; import org.elasticsearch.xpack.core.watcher.history.WatchRecord; import org.elasticsearch.xpack.core.watcher.input.Input; +import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams; import org.elasticsearch.xpack.core.watcher.transform.Transform; import org.elasticsearch.xpack.core.watcher.trigger.TriggerEvent; import org.elasticsearch.xpack.core.watcher.watch.Watch; @@ -66,10 +74,15 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; @@ -97,6 +110,8 @@ public class ExecutionService { private final Client client; private final WatchExecutor executor; private final ExecutorService genericExecutor; + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock putUpdateLock = readWriteLock.readLock(); private AtomicReference currentExecutions = new AtomicReference<>(); private final AtomicBoolean paused = new AtomicBoolean(false); @@ -399,22 +414,70 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge try { executor.execute(new WatchExecutionTask(ctx, () -> execute(ctx))); } catch (EsRejectedExecutionException e) { - String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; - WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message); - try { - if (ctx.overrideRecordOnConflict()) { - historyStore.forcePut(record); - } else { - historyStore.put(record); + //Using the generic pool here since this can happen from a write thread and we don't want to block a write + //thread to kick off these additional write/delete requests. + //Intentionally not using the HistoryStore or TriggerWatchStore to avoid re-using the same synchronous + //BulkProcessor which can cause a deadlock see #41390 + genericExecutor.execute(new WatchExecutionTask(ctx, () -> { + String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; + WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message); + try { + forcePutHistory(record); + } catch (Exception exc) { + logger.error((Supplier) () -> + new ParameterizedMessage( + "Error storing watch history record for watch [{}] after thread pool rejection", + triggeredWatch.id()), exc); } - } catch (Exception exc) { - logger.error((Supplier) () -> - new ParameterizedMessage("Error storing watch history record for watch [{}] after thread pool rejection", - triggeredWatch.id()), exc); + deleteTrigger(triggeredWatch.id()); + })); + } + } + + /** + * Stores the specified watchRecord. + * Any existing watchRecord will be overwritten. + */ + private void forcePutHistory(WatchRecord watchRecord) { + String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); + putUpdateLock.lock(); + try { + try (XContentBuilder builder = XContentFactory.jsonBuilder(); + ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { + watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS); + IndexRequest request = new IndexRequest(index) + .id(watchRecord.id().value()) + .source(builder) + .opType(IndexRequest.OpType.CREATE); + client.index(request).get(30, TimeUnit.SECONDS); + logger.debug("indexed watch history record [{}]", watchRecord.id().value()); + } catch (VersionConflictEngineException vcee) { + watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES, + "watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]"); + try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); + ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { + IndexRequest request = new IndexRequest(index) + .id(watchRecord.id().value()) + .source(xContentBuilder.value(watchRecord)); + client.index(request).get(30, TimeUnit.SECONDS); + } + logger.debug("overwrote watch history record [{}]", watchRecord.id().value()); } + } catch (InterruptedException | ExecutionException | TimeoutException | IOException ioe) { + final WatchRecord wr = watchRecord; + logger.error((Supplier) () -> new ParameterizedMessage("failed to persist watch record [{}]", wr), ioe); + } finally { + putUpdateLock.unlock(); + } + } - triggeredWatchStore.delete(triggeredWatch.id()); + private void deleteTrigger(Wid watcherId) { + DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME); + request.id(watcherId.value()); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { + client.delete(request).actionGet(30, TimeUnit.SECONDS); } + logger.trace("successfully deleted triggered watch with id [{}]", watcherId); } WatchRecord executeInner(WatchExecutionContext ctx) { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 321cf979bca73..80cb657a5762e 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -5,11 +5,13 @@ */ package org.elasticsearch.xpack.watcher.execution; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; @@ -28,8 +30,11 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.authc.Authentication; @@ -48,6 +53,7 @@ import org.elasticsearch.xpack.core.watcher.execution.ExecutionPhase; import org.elasticsearch.xpack.core.watcher.execution.ExecutionState; import org.elasticsearch.xpack.core.watcher.execution.QueuedWatch; +import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionSnapshot; import org.elasticsearch.xpack.core.watcher.execution.Wid; @@ -91,6 +97,7 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; @@ -844,11 +851,15 @@ public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exc when(getResponse.isExists()).thenReturn(true); when(getResponse.getId()).thenReturn("foo"); mockGetWatchResponse(client, "foo", getResponse); + ActionFuture actionFuture = mock(ActionFuture.class); + when(actionFuture.get()).thenReturn(""); + when(client.index(any())).thenReturn(actionFuture); + when(client.delete(any())).thenReturn(actionFuture); + when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); - // execute needs to fail as well as storing the history + // execute needs to fail doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); - doThrow(new ElasticsearchException("whatever")).when(historyStore).forcePut(any()); Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC)); @@ -856,10 +867,58 @@ public void testThatTriggeredWatchDeletionWorksOnExecutionRejection() throws Exc new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC) ,ZonedDateTime.now(ZoneOffset.UTC))); executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); - verify(triggeredWatchStore, times(1)).delete(wid); - ArgumentCaptor captor = ArgumentCaptor.forClass(WatchRecord.class); - verify(historyStore, times(1)).forcePut(captor.capture()); - assertThat(captor.getValue().state(), is(ExecutionState.THREADPOOL_REJECTION)); + ArgumentCaptor deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class); + verify(client).delete(deleteCaptor.capture()); + assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME)); + assertThat(deleteCaptor.getValue().id(), equalTo(wid.value())); + + ArgumentCaptor watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class); + verify(client).index(watchHistoryCaptor.capture()); + + assertThat(watchHistoryCaptor.getValue().source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString())); + assertThat(watchHistoryCaptor.getValue().index(), containsString(".watcher-history")); + } + + public void testForcePutHistoryOnExecutionRejection() throws Exception { + Watch watch = mock(Watch.class); + when(watch.id()).thenReturn("foo"); + WatchStatus status = new WatchStatus(ZonedDateTime.now(ZoneOffset.UTC), Collections.emptyMap()); + when(watch.status()).thenReturn(status); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(true); + when(getResponse.getId()).thenReturn("foo"); + mockGetWatchResponse(client, "foo", getResponse); + ActionFuture actionFuture = mock(ActionFuture.class); + when(actionFuture.get()).thenReturn(""); + when(client.index(any())) + .thenThrow(new VersionConflictEngineException( + new ShardId(new Index("mockindex", "mockuuid"), 0), "id", "explaination")) + .thenReturn(actionFuture); + when(client.delete(any())).thenReturn(actionFuture); + + when(parser.parseWithSecrets(eq("foo"), eq(true), any(), any(), any(), anyLong(), anyLong())).thenReturn(watch); + + // execute needs to fail + doThrow(new EsRejectedExecutionException()).when(executor).execute(any()); + + Wid wid = new Wid(watch.id(), ZonedDateTime.now(ZoneOffset.UTC)); + + TriggeredWatch triggeredWatch = new TriggeredWatch(wid, + new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC), ZonedDateTime.now(ZoneOffset.UTC))); + executionService.executeTriggeredWatches(Collections.singleton(triggeredWatch)); + + ArgumentCaptor deleteCaptor = ArgumentCaptor.forClass(DeleteRequest.class); + verify(client).delete(deleteCaptor.capture()); + assertThat(deleteCaptor.getValue().index(), equalTo(TriggeredWatchStoreField.INDEX_NAME)); + assertThat(deleteCaptor.getValue().id(), equalTo(wid.value())); + + ArgumentCaptor watchHistoryCaptor = ArgumentCaptor.forClass(IndexRequest.class); + verify(client, times(2)).index(watchHistoryCaptor.capture()); + List indexRequests = watchHistoryCaptor.getAllValues(); + + assertThat(indexRequests.get(0).id(), equalTo(indexRequests.get(1).id())); + assertThat(indexRequests.get(0).source().utf8ToString(), containsString(ExecutionState.THREADPOOL_REJECTION.toString())); + assertThat(indexRequests.get(1).source().utf8ToString(), containsString(ExecutionState.EXECUTED_MULTIPLE_TIMES.toString())); } public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exception { @@ -898,7 +957,7 @@ public void testThatTriggeredWatchDeletionHappensOnlyIfWatchExists() throws Exce when(watch.status()).thenReturn(watchStatus); executionService.execute(context); - verify(triggeredWatchStore, never()).delete(any()); + verify(client, never()).delete(any()); } public void testThatSingleWatchCannotBeExecutedConcurrently() throws Exception { From fa3364c0073f9061e2ed55db9ed52b8db78499d7 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 24 Apr 2019 15:35:43 -0500 Subject: [PATCH 2/7] added test that fails or deadlocks in prior code --- .../watcher/execution/ExecutionService.java | 1 + .../integration/RejectedExecutionTests.java | 76 +++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 6bcbe11a15ac6..7832d808993ed 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -420,6 +420,7 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge //BulkProcessor which can cause a deadlock see #41390 genericExecutor.execute(new WatchExecutionTask(ctx, () -> { String message = "failed to run triggered watch [" + triggeredWatch.id() + "] due to thread pool capacity"; + logger.warn(message); WatchRecord record = ctx.abortBeforeExecution(ExecutionState.THREADPOOL_REJECTION, message); try { forcePutHistory(record); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java new file mode 100644 index 0000000000000..cfb33b790a856 --- /dev/null +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java @@ -0,0 +1,76 @@ +package org.elasticsearch.xpack.watcher.test.integration; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.LicenseService; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.watcher.client.WatcherClient; +import org.elasticsearch.xpack.watcher.condition.CompareCondition; +import org.elasticsearch.xpack.watcher.support.search.WatcherSearchTemplateRequest; +import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; +import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule; + +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; +import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction; +import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder; +import static org.elasticsearch.xpack.watcher.input.InputBuilders.searchInput; +import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.templateRequest; +import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class RejectedExecutionTests extends AbstractWatcherIntegrationTestCase { + + @Override + protected boolean timeWarped() { + //need to use the real scheduler + return false; + } + + public void testHistoryAndTriggeredOnRejection() throws Exception { + WatcherClient watcherClient = watcherClient(); + createIndex("idx"); + client().prepareIndex("idx", "_doc").setSource("field", "a").get(); + refresh(); + WatcherSearchTemplateRequest request = templateRequest(searchSource().query(termQuery("field", "a")), "idx"); + watcherClient.preparePutWatch(randomAlphaOfLength(5)) + .setSource(watchBuilder() + .trigger(schedule(interval(1, IntervalSchedule.Interval.Unit.SECONDS))) + .input(searchInput(request)) + .condition(new CompareCondition("ctx.payload.hits.total", CompareCondition.Op.EQ, 1L)) + .addAction("_logger", loggingAction("_logging") + .setCategory("_category"))) + .get(); + + assertBusy(() -> { + flushAndRefresh(".watcher-history-*"); + SearchResponse searchResponse = client().prepareSearch(".watcher-history-*").get(); + assertThat(searchResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(2l)); + }, 10, TimeUnit.SECONDS); + + flushAndRefresh(".triggered_watches"); + SearchResponse searchResponse = client().prepareSearch(".triggered_watches").get(); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0l)); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(XPackSettings.MONITORING_ENABLED.getKey(), false) + .put(XPackSettings.SECURITY_ENABLED.getKey(), false) + .put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial") + .put("thread_pool.write.size", 1) + .put("thread_pool.write.queue_size", 1) + .put("xpack.watcher.thread_pool.size", 1) + .put("xpack.watcher.thread_pool.queue_size", 0) + .build(); + } + + +} From 99694a432da702c856ebde63957346ded96cd383 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 24 Apr 2019 16:11:51 -0500 Subject: [PATCH 3/7] add missing header --- .../watcher/test/integration/RejectedExecutionTests.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java index cfb33b790a856..5203c65f6fa2c 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java @@ -1,3 +1,8 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ package org.elasticsearch.xpack.watcher.test.integration; import org.elasticsearch.action.search.SearchResponse; From b24ebdfc6160c41f95003adba7e5e74509fd2a73 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 24 Apr 2019 16:26:09 -0500 Subject: [PATCH 4/7] you win checkstyle --- .../watcher/test/integration/RejectedExecutionTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java index 5203c65f6fa2c..a457c1052cadb 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/RejectedExecutionTests.java @@ -54,12 +54,12 @@ public void testHistoryAndTriggeredOnRejection() throws Exception { assertBusy(() -> { flushAndRefresh(".watcher-history-*"); SearchResponse searchResponse = client().prepareSearch(".watcher-history-*").get(); - assertThat(searchResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(2l)); + assertThat(searchResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(2L)); }, 10, TimeUnit.SECONDS); flushAndRefresh(".triggered_watches"); SearchResponse searchResponse = client().prepareSearch(".triggered_watches").get(); - assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0l)); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L)); } @Override From 420d4d63fb0ae2c49225ff95fb3fc2c74deaf4af Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 25 Apr 2019 08:31:19 -0500 Subject: [PATCH 5/7] fix compilation error from merge collision --- .../elasticsearch/xpack/watcher/execution/ExecutionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 202317fafe3d9..7914b47ffa697 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -456,7 +456,7 @@ private void forcePutHistory(WatchRecord watchRecord) { watchRecord = new WatchRecord.MessageWatchRecord(watchRecord, ExecutionState.EXECUTED_MULTIPLE_TIMES, "watch record [{ " + watchRecord.id() + " }] has been stored before, previous state [" + watchRecord.state() + "]"); try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder(); - ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { + ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { IndexRequest request = new IndexRequest(index) .id(watchRecord.id().value()) .source(xContentBuilder.value(watchRecord)); From b65517f155b5fa5e7d6865131fa5841342cb2f46 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 25 Apr 2019 10:33:32 -0500 Subject: [PATCH 6/7] remove unecessary lock --- .../xpack/watcher/execution/ExecutionService.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 7914b47ffa697..83f5e10037711 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -110,8 +110,6 @@ public class ExecutionService { private final Client client; private final WatchExecutor executor; private final ExecutorService genericExecutor; - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock putUpdateLock = readWriteLock.readLock(); private AtomicReference currentExecutions = new AtomicReference<>(); private final AtomicBoolean paused = new AtomicBoolean(false); @@ -441,7 +439,6 @@ private void executeAsync(WatchExecutionContext ctx, final TriggeredWatch trigge */ private void forcePutHistory(WatchRecord watchRecord) { String index = HistoryStoreField.getHistoryIndexNameForTime(watchRecord.triggerEvent().triggeredTime()); - putUpdateLock.lock(); try { try (XContentBuilder builder = XContentFactory.jsonBuilder(); ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(WATCHER_ORIGIN)) { @@ -467,8 +464,6 @@ private void forcePutHistory(WatchRecord watchRecord) { } catch (InterruptedException | ExecutionException | TimeoutException | IOException ioe) { final WatchRecord wr = watchRecord; logger.error((Supplier) () -> new ParameterizedMessage("failed to persist watch record [{}]", wr), ioe); - } finally { - putUpdateLock.unlock(); } } From ce7f7e040ffcf871455095b4f482df22e5cfc5a0 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 25 Apr 2019 10:41:35 -0500 Subject: [PATCH 7/7] fix imports --- .../xpack/watcher/execution/ExecutionService.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 83f5e10037711..76f8fc14e2635 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.watcher.execution; import com.google.common.collect.Iterables; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -81,9 +80,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.elasticsearch.xpack.core.ClientHelper.WATCHER_ORIGIN;