From 195d0f36a5c57a1a5b41077ecc23557f42680d07 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 9 Aug 2019 15:54:46 -0500 Subject: [PATCH 01/12] [ML][Data Frame] add support for wait_for_checkpoint flag on _stop --- .../client/DataFrameRequestConverters.java | 4 + .../StopDataFrameTransformRequest.java | 23 +++-- .../DataFrameRequestConvertersTests.java | 15 ++- .../client/DataFrameTransformIT.java | 4 +- .../DataFrameTransformDocumentationIT.java | 2 +- .../data-frames/apis/stop-transform.asciidoc | 5 + .../xpack/core/dataframe/DataFrameField.java | 1 + .../action/StopDataFrameTransformAction.java | 27 +++++- .../transforms/DataFrameTransformState.java | 32 ++++++- .../DataFrameTransformTaskState.java | 1 + ...pDataFrameTransformActionRequestTests.java | 16 +++- .../DataFrameTransformStateTests.java | 26 +++++- .../integration/DataFrameIntegTestCase.java | 13 ++- .../integration/DataFrameTransformIT.java | 43 +++++++++ ...ansportDeleteDataFrameTransformAction.java | 2 +- ...TransportStopDataFrameTransformAction.java | 16 +++- .../RestStopDataFrameTransformAction.java | 4 +- ...FrameTransformPersistentTasksExecutor.java | 7 +- .../transforms/DataFrameTransformTask.java | 93 ++++++++++++++++--- 19 files changed, 289 insertions(+), 45 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java index 375c0a7c3afd7..a309b80cfa2a3 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java @@ -41,6 +41,7 @@ import static org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest.FORCE; import static org.elasticsearch.client.dataframe.GetDataFrameTransformRequest.ALLOW_NO_MATCH; import static org.elasticsearch.client.dataframe.PutDataFrameTransformRequest.DEFER_VALIDATION; +import static org.elasticsearch.client.dataframe.StopDataFrameTransformRequest.WAIT_FOR_CHECKPOINT; final class DataFrameRequestConverters { @@ -135,6 +136,9 @@ static Request stopDataFrameTransform(StopDataFrameTransformRequest stopRequest) if (stopRequest.getAllowNoMatch() != null) { request.addParameter(ALLOW_NO_MATCH, stopRequest.getAllowNoMatch().toString()); } + if (stopRequest.getWaitForCheckpoint() != null) { + request.addParameter(WAIT_FOR_CHECKPOINT, stopRequest.getWaitForCheckpoint().toString()); + } request.addParameters(params.asMap()); return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StopDataFrameTransformRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StopDataFrameTransformRequest.java index 4fb6164f2cca9..8e6844eedc56b 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StopDataFrameTransformRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StopDataFrameTransformRequest.java @@ -28,21 +28,23 @@ public class StopDataFrameTransformRequest implements Validatable { + public static final String WAIT_FOR_CHECKPOINT = "wait_for_checkpoint"; + private final String id; private Boolean waitForCompletion; private TimeValue timeout; private Boolean allowNoMatch; + private Boolean waitForCheckpoint; public StopDataFrameTransformRequest(String id) { - this.id = id; - waitForCompletion = null; - timeout = null; + this(id, null, null, null); } - public StopDataFrameTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout) { + public StopDataFrameTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout, Boolean waitForCheckpoint) { this.id = id; this.waitForCompletion = waitForCompletion; this.timeout = timeout; + this.waitForCheckpoint = waitForCheckpoint; } public String getId() { @@ -73,6 +75,14 @@ public void setAllowNoMatch(Boolean allowNoMatch) { this.allowNoMatch = allowNoMatch; } + public Boolean getWaitForCheckpoint() { + return waitForCheckpoint; + } + + public void setWaitForCheckpoint(Boolean waitForCheckpoint) { + this.waitForCheckpoint = waitForCheckpoint; + } + @Override public Optional validate() { if (id == null) { @@ -86,7 +96,7 @@ public Optional validate() { @Override public int hashCode() { - return Objects.hash(id, waitForCompletion, timeout, allowNoMatch); + return Objects.hash(id, waitForCompletion, timeout, allowNoMatch, waitForCheckpoint); } @Override @@ -102,7 +112,8 @@ public boolean equals(Object obj) { return Objects.equals(this.id, other.id) && Objects.equals(this.waitForCompletion, other.waitForCompletion) && Objects.equals(this.timeout, other.timeout) - && Objects.equals(this.allowNoMatch, other.allowNoMatch); + && Objects.equals(this.allowNoMatch, other.allowNoMatch) + && Objects.equals(this.waitForCheckpoint, other.waitForCheckpoint); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java index 529b1ea6235ec..055865be87ead 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java @@ -149,7 +149,13 @@ public void testStopDataFrameTransform() { if (randomBoolean()) { timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout"); } - StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, waitForCompletion, timeValue); + + Boolean waitForCheckpoint = null; + if (randomBoolean()) { + waitForCheckpoint = randomBoolean(); + } + + StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, waitForCompletion, timeValue, waitForCheckpoint); Request request = DataFrameRequestConverters.stopDataFrameTransform(stopRequest); assertEquals(HttpPost.METHOD_NAME, request.getMethod()); @@ -169,6 +175,13 @@ public void testStopDataFrameTransform() { assertFalse(request.getParameters().containsKey("timeout")); } + if (waitForCheckpoint != null) { + assertTrue(request.getParameters().containsKey("wait_for_checkpoint")); + assertEquals(stopRequest.getWaitForCheckpoint(), Boolean.parseBoolean(request.getParameters().get("wait_for_checkpoint"))); + } else { + assertFalse(request.getParameters().containsKey("wait_for_checkpoint")); + } + assertFalse(request.getParameters().containsKey(ALLOW_NO_MATCH)); stopRequest.setAllowNoMatch(randomBoolean()); request = DataFrameRequestConverters.stopDataFrameTransform(stopRequest); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index 9566eba41e1f9..c1b45799e5cf6 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -147,7 +147,7 @@ private void indexData(String indexName) throws IOException { public void cleanUpTransforms() throws Exception { for (String transformId : transformsToClean) { highLevelClient().dataFrame().stopDataFrameTransform( - new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT); + new StopDataFrameTransformRequest(transformId, true, null, false), RequestOptions.DEFAULT); } for (String transformId : transformsToClean) { @@ -310,7 +310,7 @@ public void testStartStop() throws IOException { assertThat(taskState, oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING, DataFrameTransformStats.State.STOPPING, DataFrameTransformStats.State.STOPPED)); - StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null); + StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, true, null, false); StopDataFrameTransformResponse stopResponse = execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync); assertTrue(stopResponse.isAcknowledged()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 8dedfd83af7b4..85632b92efd23 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -81,7 +81,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest public void cleanUpTransforms() throws Exception { for (String transformId : transformsToClean) { highLevelClient().dataFrame().stopDataFrameTransform( - new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT); + new StopDataFrameTransformRequest(transformId, true, TimeValue.timeValueSeconds(20), false), RequestOptions.DEFAULT); } for (String transformId : transformsToClean) { diff --git a/docs/reference/data-frames/apis/stop-transform.asciidoc b/docs/reference/data-frames/apis/stop-transform.asciidoc index f6f7784ebb91f..d99a34cbba9ba 100644 --- a/docs/reference/data-frames/apis/stop-transform.asciidoc +++ b/docs/reference/data-frames/apis/stop-transform.asciidoc @@ -81,6 +81,11 @@ are no matches or only partial matches. state completely stops. If set to `false`, the API returns immediately and the indexer will be stopped asynchronously in the background. Defaults to `false`. +`wait_for_checkpoint`:: + (Optional, boolean) If set to `true`, the transform will not completely stop + until the current checkpoint is completed. If set to `false`, the transform + stops as soon as possible. Defaults to `true`. + [[stop-data-frame-transform-response-codes]] ==== {api-response-codes-title} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index c0c209a9b542b..d10e0e5a9c973 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -22,6 +22,7 @@ public final class DataFrameField { public static final ParseField GROUP_BY = new ParseField("group_by"); public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion"); + public static final ParseField WAIT_FOR_CHECKPOINT = new ParseField("wait_for_checkpoint"); public static final ParseField STATS_FIELD = new ParseField("stats"); public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type"); public static final ParseField SOURCE = new ParseField("source"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformAction.java index eef244551a33c..137abf3037357 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformAction.java @@ -46,11 +46,17 @@ private StopDataFrameTransformAction() { public static class Request extends BaseTasksRequest { private final String id; private final boolean waitForCompletion; + private final boolean waitForCheckpoint; private final boolean force; private final boolean allowNoMatch; private Set expandedIds; - public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout, boolean allowNoMatch) { + public Request(String id, + boolean waitForCompletion, + boolean force, + @Nullable TimeValue timeout, + boolean allowNoMatch, + boolean waitForCheckpoint) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); this.waitForCompletion = waitForCompletion; this.force = force; @@ -58,6 +64,7 @@ public Request(String id, boolean waitForCompletion, boolean force, @Nullable Ti // use the timeout value already present in BaseTasksRequest this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout); this.allowNoMatch = allowNoMatch; + this.waitForCheckpoint = waitForCheckpoint; } public Request(StreamInput in) throws IOException { @@ -73,6 +80,11 @@ public Request(StreamInput in) throws IOException { } else { this.allowNoMatch = true; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.waitForCheckpoint = in.readBoolean(); + } else { + this.waitForCheckpoint = true; + } } public String getId() { @@ -99,6 +111,10 @@ public boolean isAllowNoMatch() { return allowNoMatch; } + public boolean isWaitForCheckpoint() { + return waitForCheckpoint; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -113,6 +129,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_3_0)) { out.writeBoolean(allowNoMatch); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(waitForCheckpoint); + } } @Override @@ -123,7 +142,7 @@ public ActionRequestValidationException validate() { @Override public int hashCode() { // the base class does not implement hashCode, therefore we need to hash timeout ourselves - return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch); + return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch, waitForCheckpoint); } @Override @@ -146,7 +165,8 @@ public boolean equals(Object obj) { Objects.equals(waitForCompletion, other.waitForCompletion) && Objects.equals(force, other.force) && Objects.equals(expandedIds, other.expandedIds) && - allowNoMatch == other.allowNoMatch; + allowNoMatch == other.allowNoMatch && + waitForCheckpoint == other.waitForCheckpoint; } @Override @@ -157,7 +177,6 @@ public boolean match(Task task) { return expandedIds.contains(id); } } - return false; } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java index 6cc058e5acdff..2d5ed95c669a6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java @@ -43,6 +43,9 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState @Nullable private NodeAttributes node; + // TODO: 8.x this needs to be deprecated and we move towards a STOPPING TASK_STATE + private boolean shouldStopAtCheckpoint; + public static final ParseField TASK_STATE = new ParseField("task_state"); public static final ParseField INDEXER_STATE = new ParseField("indexer_state"); @@ -54,6 +57,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState public static final ParseField PROGRESS = new ParseField("progress"); public static final ParseField NODE = new ParseField("node"); + @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, @@ -93,7 +97,8 @@ public DataFrameTransformState(DataFrameTransformTaskState taskState, long checkpoint, @Nullable String reason, @Nullable DataFrameTransformProgress progress, - @Nullable NodeAttributes node) { + @Nullable NodeAttributes node, + boolean shouldStopAtCheckpoint) { this.taskState = taskState; this.indexerState = indexerState; this.position = position; @@ -101,6 +106,17 @@ public DataFrameTransformState(DataFrameTransformTaskState taskState, this.reason = reason; this.progress = progress; this.node = node; + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + } + + public DataFrameTransformState(DataFrameTransformTaskState taskState, + IndexerState indexerState, + @Nullable DataFrameIndexerPosition position, + long checkpoint, + @Nullable String reason, + @Nullable DataFrameTransformProgress progress, + @Nullable NodeAttributes node) { + this(taskState, indexerState, position, checkpoint, reason, progress, node, false); } public DataFrameTransformState(DataFrameTransformTaskState taskState, @@ -129,6 +145,9 @@ public DataFrameTransformState(StreamInput in) throws IOException { } else { node = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + shouldStopAtCheckpoint = in.readBoolean(); + } } public DataFrameTransformTaskState getTaskState() { @@ -164,6 +183,14 @@ public DataFrameTransformState setNode(NodeAttributes node) { return this; } + public boolean shouldStopAtCheckpoint() { + return shouldStopAtCheckpoint; + } + + public void setShouldStopAtCheckoint(boolean shouldStopAtCheckpoint) { + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + } + public static DataFrameTransformState fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); @@ -214,6 +241,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_3_0)) { out.writeOptionalWriteable(node); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(shouldStopAtCheckpoint); + } } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskState.java index 795daca61ace6..da4fd6c12c622 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTaskState.java @@ -14,6 +14,7 @@ import java.util.Locale; public enum DataFrameTransformTaskState implements Writeable { + // TODO 8.x add a `STOPPING` state and BWC handling in ::fromString STOPPED, STARTED, FAILED; public static DataFrameTransformTaskState fromString(String name) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformActionRequestTests.java index cce889baa9675..33c50735c56cb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StopDataFrameTransformActionRequestTests.java @@ -24,7 +24,12 @@ public class StopDataFrameTransformActionRequestTests extends AbstractWireSerial @Override protected Request createTestInstance() { TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null; - Request request = new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout, randomBoolean()); + Request request = new Request(randomAlphaOfLengthBetween(1, 10), + randomBoolean(), + randomBoolean(), + timeout, + randomBoolean(), + randomBoolean()); if (randomBoolean()) { request.setExpandedIds(new HashSet<>(Arrays.asList(generateRandomStringArray(5, 6, false)))); } @@ -41,9 +46,10 @@ public void testSameButDifferentTimeout() { boolean waitForCompletion = randomBoolean(); boolean force = randomBoolean(); boolean allowNoMatch = randomBoolean(); + boolean waitForCheckpoint = randomBoolean(); - Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch); - Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch); + Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch, waitForCheckpoint); + Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch, waitForCheckpoint); assertNotEquals(r1,r2); assertNotEquals(r1.hashCode(),r2.hashCode()); @@ -56,11 +62,11 @@ public void testMatch() { DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId, TaskId.EMPTY_TASK_ID, Collections.emptyMap()); - Request request = new Request("unrelated", false, false, null, false); + Request request = new Request("unrelated", false, false, null, false, false); request.setExpandedIds(Set.of("foo", "bar")); assertFalse(request.match(dataFrameTask)); - Request matchingRequest = new Request(dataFrameId, false, false, null, false); + Request matchingRequest = new Request(dataFrameId, false, false, null, false, false); matchingRequest.setExpandedIds(Set.of(dataFrameId)); assertTrue(matchingRequest.match(dataFrameTask)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java index cc6fe88e5b273..3753075d9d1ee 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateTests.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.core.dataframe.transforms; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -26,7 +29,8 @@ public static DataFrameTransformState randomDataFrameTransformState() { randomLongBetween(0,10), randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : randomDataFrameTransformProgress(), - randomBoolean() ? null : randomNodeAttributes()); + randomBoolean() ? null : randomNodeAttributes(), + randomBoolean()); } @Override @@ -53,4 +57,24 @@ protected boolean supportsUnknownFields() { protected Predicate getRandomFieldsExcludeFilter() { return field -> !field.isEmpty(); } + + public void testBackwardsSerialization() throws IOException { + DataFrameTransformState state = new DataFrameTransformState(randomFrom(DataFrameTransformTaskState.values()), + randomFrom(IndexerState.values()), + DataFrameIndexerPositionTests.randomDataFrameIndexerPosition(), + randomLongBetween(0,10), + randomBoolean() ? null : randomAlphaOfLength(10), + randomBoolean() ? null : randomDataFrameTransformProgress(), + randomBoolean() ? null : randomNodeAttributes(), + false); // Will be false after BWC deserialization + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_7_4_0); + state.writeTo(output); + try (StreamInput in = output.bytes().streamInput()) { + in.setVersion(Version.V_7_4_0); + DataFrameTransformState streamedState = new DataFrameTransformState(in); + assertEquals(state, streamedState); + } + } + } } diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java index 3e2ffd5857e3f..5cdb1fca06da5 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -86,8 +86,17 @@ protected void cleanUpTransforms() throws IOException { } protected StopDataFrameTransformResponse stopDataFrameTransform(String id) throws IOException { + return stopDataFrameTransform(id, true, null, false); + } + + protected StopDataFrameTransformResponse stopDataFrameTransform(String id, + boolean waitForCompletion, + TimeValue timeout, + boolean waitForCheckpoint) throws IOException { RestHighLevelClient restClient = new TestRestHighLevelClient(); - return restClient.dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(id, true, null), RequestOptions.DEFAULT); + return restClient.dataFrame() + .stopDataFrameTransform(new StopDataFrameTransformRequest(id, waitForCompletion, timeout, waitForCheckpoint), + RequestOptions.DEFAULT); } protected StartDataFrameTransformResponse startDataFrameTransform(String id, RequestOptions options) throws IOException { @@ -298,7 +307,7 @@ protected void createReviewsIndex(String indexName, int numDocs) throws Exceptio .append("\"}"); bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON)); - if (i % 50 == 0) { + if (i % 100 == 0) { BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT); assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); bulk = new BulkRequest(indexName); diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java index b239d461b1c4c..9465d983288f4 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java @@ -235,6 +235,49 @@ public void testContinuousDataFrameTransformUpdate() throws Exception { deleteDataFrameTransform(config.getId()); } + public void testStopWaitForCheckpoint() throws Exception { + String indexName = "wait-for-checkpoint-reviews"; + String transformId = "data-frame-transform-wait-for-checkpoint"; + createReviewsIndex(indexName, 1000); + + Map groups = new HashMap<>(); + groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null)); + groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); + groups.put("by-business", TermsGroupSource.builder().setField("business_id").build()); + + AggregatorFactories.Builder aggs = AggregatorFactories.builder() + .addAggregator(AggregationBuilders.avg("review_score").field("stars")) + .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); + + DataFrameTransformConfig config = createTransformConfigBuilder(transformId, + groups, + aggs, + "reviews-by-user-business-day", + QueryBuilders.matchAllQuery(), + indexName) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) + .build(); + + assertTrue(putDataFrameTransform(config, RequestOptions.DEFAULT).isAcknowledged()); + assertTrue(startDataFrameTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + + // waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop + stopDataFrameTransform(transformId, false, null, true); + + // Wait until the first checkpoint + waitUntilCheckpoint(config.getId(), 1L); + + // Even though we are continuous, we should be stopped now as we needed to stop at the first checkpoint + assertBusy(() -> { + DataFrameTransformStats stateAndStats = getDataFrameTransformStats(config.getId()).getTransformsStats().get(0); + assertThat(stateAndStats.getState(), equalTo(DataFrameTransformStats.State.STOPPED)); + assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(1000L)); + }); + + stopDataFrameTransform(config.getId()); + deleteDataFrameTransform(config.getId()); + } + private void indexMoreDocs(long timestamp, long userId, String index) throws Exception { BulkRequest bulk = new BulkRequest(index); for (int i = 0; i < 25; i++) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java index 25f8550766f2a..6a2f250323a3a 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java @@ -85,7 +85,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, StopDataFrameTransformAction.INSTANCE, - new StopDataFrameTransformAction.Request(request.getId(), true, true, null, true), + new StopDataFrameTransformAction.Request(request.getId(), true, true, null, true, false), ActionListener.wrap( r -> stopTransformActionListener.onResponse(null), stopTransformActionListener::onFailure)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index c2ddcdb9bac2b..397f20e9215a9 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -131,8 +131,9 @@ protected void doExecute(Task task, StopDataFrameTransformAction.Request request } @Override - protected void taskOperation(StopDataFrameTransformAction.Request request, DataFrameTransformTask transformTask, - ActionListener listener) { + protected void taskOperation(StopDataFrameTransformAction.Request request, + DataFrameTransformTask transformTask, + ActionListener listener) { Set ids = request.getExpandedIds(); if (ids == null) { @@ -153,8 +154,15 @@ protected void taskOperation(StopDataFrameTransformAction.Request request, DataF return; } - transformTask.stop(); - listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); + // To cover strange state race conditions, we adjust the variable first (which writes to cluster state if it is different) + // then we stop the transform + transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap( + r -> { + transformTask.stop(); + listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); + }, + listener::onFailure + )); } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java index ab7b1b464d6c9..b22c07e21744d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestStopDataFrameTransformAction.java @@ -32,13 +32,15 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient boolean waitForCompletion = restRequest.paramAsBoolean(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), false); boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false); boolean allowNoMatch = restRequest.paramAsBoolean(DataFrameField.ALLOW_NO_MATCH.getPreferredName(), false); + boolean waitForCheckpoint = restRequest.paramAsBoolean(DataFrameField.WAIT_FOR_CHECKPOINT.getPreferredName(), true); StopDataFrameTransformAction.Request request = new StopDataFrameTransformAction.Request(id, waitForCompletion, force, timeout, - allowNoMatch); + allowNoMatch, + waitForCheckpoint); return channel -> client.execute(StopDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 4ce0b5d7ed57d..2af378b6a58ae 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -127,6 +127,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr if (transformPTaskState != null && transformPTaskState.getTaskState() == DataFrameTransformTaskState.FAILED) { return; } + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + final boolean shouldStopAtCheckpoint = transformPTaskState != null && transformPTaskState.shouldStopAtCheckpoint(); final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder = new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId) @@ -192,6 +194,9 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr ActionListener transformStatsActionListener = ActionListener.wrap( stateAndStats -> { logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString()); + DataFrameTransformState transformState = stateAndStats.getTransformState(); + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + transformState.setShouldStopAtCheckoint(shouldStopAtCheckpoint); indexerBuilder.setInitialStats(stateAndStats.getTransformStats()) .setInitialPosition(stateAndStats.getTransformState().getPosition()) .setProgress(stateAndStats.getTransformState().getProgress()) @@ -201,7 +206,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr stateAndStats.getTransformState(), stateAndStats.getTransformState().getPosition()); - stateHolder.set(stateAndStats.getTransformState()); + stateHolder.set(transformState); final long lastCheckpoint = stateHolder.get().getCheckpoint(); if (lastCheckpoint == 0) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index e70687592e92b..acfc1e5a6569d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -86,6 +86,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final DataFrameAuditor auditor; private final DataFrameIndexerPosition initialPosition; private final IndexerState initialIndexerState; + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + private volatile boolean shouldStopAtCheckpoint = false; private final SetOnce indexer = new SetOnce<>(); @@ -149,23 +151,24 @@ private ClientDataFrameIndexer getIndexer() { } public DataFrameTransformState getState() { - if (getIndexer() == null) { - return new DataFrameTransformState( - taskState.get(), + return getIndexer() == null ? + new DataFrameTransformState(taskState.get(), initialIndexerState, initialPosition, currentCheckpoint.get(), stateReason.get(), - null); - } else { - return new DataFrameTransformState( + null, + null, //Node attributes + shouldStopAtCheckpoint) : + new DataFrameTransformState( taskState.get(), indexer.get().getState(), indexer.get().getPosition(), currentCheckpoint.get(), stateReason.get(), - getIndexer().getProgress()); - } + getIndexer().getProgress(), + null, //Node attributes + shouldStopAtCheckpoint); } public DataFrameIndexerTransformStats getStats() { @@ -237,6 +240,10 @@ public synchronized void start(Long startingCheckpoint, ActionListener getTransformId())); return; } + if (shouldStopAtCheckpoint) { + listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], " + + "because it is stopping at the next checkpoint.", transform.getId())); + } final IndexerState newState = getIndexer().start(); if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) { listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]", @@ -280,6 +287,34 @@ public synchronized void start(Long startingCheckpoint, ActionListener )); } + /** + * This sets the flag for the task to stop at the next checkpoint. + * + * If first persists the flag to cluster state, and then mutates the local variable. + * + * It only persists to cluster state if the value is different than what is currently held in memory. + * @param shouldStopAtCheckpoint whether or not we should stop at the next checkpoint or not + * @param shouldStopAtCheckpointListener the listener to return to when we have persisted the updated value to cluster state. + */ + public void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener shouldStopAtCheckpointListener) { + DataFrameTransformState state = getState(); + if (state.shouldStopAtCheckpoint() != shouldStopAtCheckpoint && state.getTaskState() != DataFrameTransformTaskState.FAILED) { + state.setShouldStopAtCheckoint(shouldStopAtCheckpoint); + persistStateToClusterState(state, ActionListener.wrap( + r -> { + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + if (shouldStopAtCheckpoint) { + stateReason.set("Transform is stopping at next checkpoint."); + } + shouldStopAtCheckpointListener.onResponse(null); + }, + shouldStopAtCheckpointListener::onFailure + )); + } else { + shouldStopAtCheckpointListener.onResponse(null); + } + } + public synchronized void stop() { if (getIndexer() == null) { // If there is no indexer the task has not been triggered @@ -288,15 +323,19 @@ public synchronized void stop() { return; } - if (getIndexer().getState() == IndexerState.STOPPED) { + if (getIndexer().getState() == IndexerState.STOPPED || getIndexer().getState() == IndexerState.STOPPING) { return; } - - IndexerState state = getIndexer().stop(); - stateReason.set(null); - if (state == IndexerState.STOPPED) { - getIndexer().onStop(); - getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); + // shouldStopAtCheckpoint only comes into play when onFinish is called, if the indexerState is STARTED, that is a good + // indication that the indexer is not running and has previously finished a checkpoint, or has yet to even start one. + // Either way, this means that we won't get to have onFinish called down stream. We should just stop the indexer + if (this.shouldStopAtCheckpoint == false || getIndexer().getState() == IndexerState.STARTED) { + stateReason.set(null); + IndexerState state = getIndexer().stop(); + if (state == IndexerState.STOPPED) { + getIndexer().onStop(); + getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); + } } } @@ -313,6 +352,18 @@ public synchronized void triggered(Event event) { } logger.debug("Data frame indexer [{}] schedule has triggered, state: [{}]", event.getJobName(), getIndexer().getState()); + // If we are failed, don't trigger + if (taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.debug("Schedule was triggered for transform [{}] but task is failed. Ignoring trigger.", getTransformId()); + return; + } + // If we are stopped or stopping, don't trigger + if (getIndexer().getState() == IndexerState.STOPPING || getIndexer().getState() == IndexerState.STOPPED) { + logger.debug("Schedule was triggered for transform [{}] but indexer is [{}]. Ignoring trigger.", + getTransformId(), + getIndexer().getState()); + return; + } // if it runs for the 1st time we just do it, if not we check for changes if (currentCheckpoint.get() == 0 ) { @@ -379,6 +430,10 @@ synchronized void markAsFailed(String reason, ActionListener listener) { currentCheckpoint.get(), reason, getIndexer() == null ? null : getIndexer().getProgress()); + // The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again, + // we should set this flag to false. + // The end user should see that the task is in a failed state, and attempt to stop it again but with force=true + newState.setShouldStopAtCheckoint(false); // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate // This keeps track of STARTED, FAILED, STOPPED // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that @@ -387,12 +442,14 @@ synchronized void markAsFailed(String reason, ActionListener listener) { r -> { taskState.set(DataFrameTransformTaskState.FAILED); stateReason.set(reason); + shouldStopAtCheckpoint = false; listener.onResponse(null); }, e -> { logger.error("Failed to set task state as failed to cluster state", e); taskState.set(DataFrameTransformTaskState.FAILED); stateReason.set(reason); + shouldStopAtCheckpoint = false; listener.onFailure(e); } )); @@ -824,6 +881,7 @@ protected void onFinish(ActionListener listener) { nextCheckpoint = null; // Reset our failure count as we have finished and may start again with a new checkpoint failureCount.set(0); + transformTask.stateReason.set(null); // TODO: progress hack to get around bucket_selector filtering out buckets // With bucket_selector we could have read all the buckets and completed the transform @@ -841,6 +899,11 @@ protected void onFinish(ActionListener listener) { logger.debug( "Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]"); auditBulkFailures = true; + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + if (transformTask.shouldStopAtCheckpoint) { + stop(); + transformTask.shouldStopAtCheckpoint = false; + } listener.onResponse(null); } catch (Exception e) { listener.onFailure(e); From d87f3ac69b8f15c0f62bfec242ce1f37d7e0eb81 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Mon, 19 Aug 2019 11:42:26 -0500 Subject: [PATCH 02/12] intermediate commit --- .../transforms/DataFrameTransformState.java | 38 +++++++----- ...FrameTransformPersistentTasksExecutor.java | 4 +- .../transforms/DataFrameTransformTask.java | 58 ++++++++++++++----- 3 files changed, 67 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java index 2d5ed95c669a6..fc2668dbac193 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformState.java @@ -44,7 +44,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState private NodeAttributes node; // TODO: 8.x this needs to be deprecated and we move towards a STOPPING TASK_STATE - private boolean shouldStopAtCheckpoint; + private final boolean shouldStopAtNextCheckpoint; public static final ParseField TASK_STATE = new ParseField("task_state"); public static final ParseField INDEXER_STATE = new ParseField("indexer_state"); @@ -56,6 +56,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState public static final ParseField REASON = new ParseField("reason"); public static final ParseField PROGRESS = new ParseField("progress"); public static final ParseField NODE = new ParseField("node"); + public static final ParseField SHOULD_STOP_AT_NEXT_CHECKPOINT = new ParseField("should_stop_at_checkpoint"); @SuppressWarnings("unchecked") @@ -76,8 +77,16 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState String reason = (String) args[5]; DataFrameTransformProgress progress = (DataFrameTransformProgress) args[6]; NodeAttributes node = (NodeAttributes) args[7]; - - return new DataFrameTransformState(taskState, indexerState, dataFrameIndexerPosition, checkpoint, reason, progress, node); + boolean shouldStopAtNextCheckpoint = args[8] == null ? false : (boolean)args[8]; + + return new DataFrameTransformState(taskState, + indexerState, + dataFrameIndexerPosition, + checkpoint, + reason, + progress, + node, + shouldStopAtNextCheckpoint); }); static { @@ -89,6 +98,7 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState PARSER.declareString(optionalConstructorArg(), REASON); PARSER.declareField(optionalConstructorArg(), DataFrameTransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT); PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT); + PARSER.declareBoolean(optionalConstructorArg(), SHOULD_STOP_AT_NEXT_CHECKPOINT); } public DataFrameTransformState(DataFrameTransformTaskState taskState, @@ -98,7 +108,7 @@ public DataFrameTransformState(DataFrameTransformTaskState taskState, @Nullable String reason, @Nullable DataFrameTransformProgress progress, @Nullable NodeAttributes node, - boolean shouldStopAtCheckpoint) { + boolean shouldStopAtNextCheckpoint) { this.taskState = taskState; this.indexerState = indexerState; this.position = position; @@ -106,7 +116,7 @@ public DataFrameTransformState(DataFrameTransformTaskState taskState, this.reason = reason; this.progress = progress; this.node = node; - this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + this.shouldStopAtNextCheckpoint = shouldStopAtNextCheckpoint; } public DataFrameTransformState(DataFrameTransformTaskState taskState, @@ -146,7 +156,9 @@ public DataFrameTransformState(StreamInput in) throws IOException { node = null; } if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - shouldStopAtCheckpoint = in.readBoolean(); + shouldStopAtNextCheckpoint = in.readBoolean(); + } else { + shouldStopAtNextCheckpoint = false; } } @@ -183,12 +195,8 @@ public DataFrameTransformState setNode(NodeAttributes node) { return this; } - public boolean shouldStopAtCheckpoint() { - return shouldStopAtCheckpoint; - } - - public void setShouldStopAtCheckoint(boolean shouldStopAtCheckpoint) { - this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + public boolean shouldStopAtNextCheckpoint() { + return shouldStopAtNextCheckpoint; } public static DataFrameTransformState fromXContent(XContentParser parser) { @@ -217,6 +225,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (node != null) { builder.field(NODE.getPreferredName(), node); } + builder.field(SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName(), shouldStopAtNextCheckpoint); builder.endObject(); return builder; } @@ -242,7 +251,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(node); } if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - out.writeBoolean(shouldStopAtCheckpoint); + out.writeBoolean(shouldStopAtNextCheckpoint); } } @@ -264,12 +273,13 @@ public boolean equals(Object other) { this.checkpoint == that.checkpoint && Objects.equals(this.reason, that.reason) && Objects.equals(this.progress, that.progress) && + Objects.equals(this.shouldStopAtNextCheckpoint, that.shouldStopAtNextCheckpoint) && Objects.equals(this.node, that.node); } @Override public int hashCode() { - return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node); + return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node, shouldStopAtNextCheckpoint); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 186dcdd0015a5..decb4684503e7 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -127,8 +127,6 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr if (transformPTaskState != null && transformPTaskState.getTaskState() == DataFrameTransformTaskState.FAILED) { return; } - // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING - final boolean shouldStopAtCheckpoint = transformPTaskState != null && transformPTaskState.shouldStopAtCheckpoint(); final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder = new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId) @@ -196,7 +194,6 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString()); DataFrameTransformState transformState = stateAndStats.getTransformState(); // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING - transformState.setShouldStopAtCheckoint(shouldStopAtCheckpoint); indexerBuilder.setInitialStats(stateAndStats.getTransformStats()) .setInitialPosition(stateAndStats.getTransformState().getPosition()) .setProgress(stateAndStats.getTransformState().getProgress()) @@ -207,6 +204,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr stateAndStats.getTransformState().getPosition()); stateHolder.set(transformState); + buildTask.setShouldStopAtCheckpoint(transformState.shouldStopAtNextCheckpoint()); final long lastCheckpoint = stateHolder.get().getCheckpoint(); if (lastCheckpoint == 0) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index be45068cd3e5c..fb09f9dc9ce70 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -319,25 +319,50 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis * @param shouldStopAtCheckpoint whether or not we should stop at the next checkpoint or not * @param shouldStopAtCheckpointListener the listener to return to when we have persisted the updated value to cluster state. */ - public void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener shouldStopAtCheckpointListener) { - DataFrameTransformState state = getState(); - if (state.shouldStopAtCheckpoint() != shouldStopAtCheckpoint && state.getTaskState() != DataFrameTransformTaskState.FAILED) { - state.setShouldStopAtCheckoint(shouldStopAtCheckpoint); - persistStateToClusterState(state, ActionListener.wrap( - r -> { - this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; - if (shouldStopAtCheckpoint) { - stateReason.set("Transform is stopping at next checkpoint."); + public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, + ActionListener shouldStopAtCheckpointListener) { + logger.debug("[{}] attempted to set task to stop at checkpoint with state [{}]", getTransformId(), getState()); + if (this.shouldStopAtCheckpoint == shouldStopAtCheckpoint || + taskState.get() == DataFrameTransformTaskState.STARTED == false || + getIndexer() == null || + getIndexer().getState() == IndexerState.STOPPED || + getIndexer().getState() == IndexerState.STOPPING) { + } + if (this.shouldStopAtCheckpoint != shouldStopAtCheckpoint + && taskState.get() != DataFrameTransformTaskState.FAILED + && getIndexer() != null + && getIndexer().getState() != IndexerState.STOPPED) { + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + DataFrameTransformState state = new DataFrameTransformState( + taskState.get(), + indexer.get().getState(), + indexer.get().getPosition(), + currentCheckpoint.get(), + stateReason.get(), + getIndexer().getProgress(), + null, //Node attributes + shouldStopAtCheckpoint); + getIndexer().transformsConfigManager.putOrUpdateTransformStoredDoc( + new DataFrameTransformStoredDoc(getTransformId(), state, getStats()), + ActionListener.wrap( + r -> { + // for auto stop shutdown the task + shouldStopAtCheckpointListener.onResponse(null); + }, + statsExc -> { + logger.error("Updating stats of transform [" + getTransformId() + "] failed", statsExc); + shouldStopAtCheckpointListener.onFailure(statsExc); } - shouldStopAtCheckpointListener.onResponse(null); - }, - shouldStopAtCheckpointListener::onFailure - )); + )); } else { shouldStopAtCheckpointListener.onResponse(null); } } + public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + } + public synchronized void stop(boolean force) { logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState()); if (getIndexer() == null) { @@ -473,19 +498,20 @@ synchronized void markAsFailed(String reason, ActionListener listener) { // We should not keep retrying. Either the task will be stopped, or started // If it is started again, it is registered again. deregisterSchedulerJob(); + shouldStopAtCheckpoint = false; DataFrameTransformState newState = new DataFrameTransformState( DataFrameTransformTaskState.FAILED, getIndexer() == null ? initialIndexerState : getIndexer().getState(), getIndexer() == null ? initialPosition : getIndexer().getPosition(), currentCheckpoint.get(), reason, - getIndexer() == null ? null : getIndexer().getProgress()); + getIndexer() == null ? null : getIndexer().getProgress(), + null, + false); // The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again, // we should set this flag to false. // The end user should see that the task is in a failed state, and attempt to stop it again but with force=true - newState.setShouldStopAtCheckoint(false); - shouldStopAtCheckpoint = false; taskState.set(DataFrameTransformTaskState.FAILED); stateReason.set(reason); // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate From a8dcefba337041fc2a015542f54f9cf4910c69be Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 28 Aug 2019 11:08:07 -0500 Subject: [PATCH 03/12] minor fix --- .../transforms/DataFrameTransformTask.java | 46 ++++++++----------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 6bef1f0ade91c..4cbb2098e9928 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -276,6 +276,7 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis if (shouldStopAtCheckpoint) { listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], " + "because it is stopping at the next checkpoint.", transform.getId())); + return; } final IndexerState newState = getIndexer().start(); if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) { @@ -340,36 +341,25 @@ public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoin getIndexer() == null || getIndexer().getState() == IndexerState.STOPPED || getIndexer().getState() == IndexerState.STOPPING) { - } - if (this.shouldStopAtCheckpoint != shouldStopAtCheckpoint - && taskState.get() != DataFrameTransformTaskState.FAILED - && getIndexer() != null - && getIndexer().getState() != IndexerState.STOPPED) { - this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; - DataFrameTransformState state = new DataFrameTransformState( - taskState.get(), - indexer.get().getState(), - indexer.get().getPosition(), - currentCheckpoint.get(), - stateReason.get(), - getIndexer().getProgress(), - null, //Node attributes - shouldStopAtCheckpoint); - getIndexer().transformsConfigManager.putOrUpdateTransformStoredDoc( - new DataFrameTransformStoredDoc(getTransformId(), state, getStats()), - ActionListener.wrap( - r -> { - // for auto stop shutdown the task - shouldStopAtCheckpointListener.onResponse(null); - }, - statsExc -> { - logger.error("Updating stats of transform [" + getTransformId() + "] failed", statsExc); - shouldStopAtCheckpointListener.onFailure(statsExc); - } - )); - } else { shouldStopAtCheckpointListener.onResponse(null); + return; } + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + DataFrameTransformState state = getState(); + getIndexer().transformsConfigManager.putOrUpdateTransformStoredDoc( + new DataFrameTransformStoredDoc(getTransformId(), state, getStats()), + ActionListener.wrap( + r -> { + // for auto stop shutdown the task + shouldStopAtCheckpointListener.onResponse(null); + }, + statsExc -> { + logger.error("Updating stats of transform [" + getTransformId() + "] failed", statsExc); + // TODO do we want to unset it if we are returning a failure here? + this.shouldStopAtCheckpoint = !shouldStopAtCheckpoint; + shouldStopAtCheckpointListener.onFailure(statsExc); + } + )); } public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { From b467f2273db07433940beba5a3f8f92af940c835 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 28 Aug 2019 14:44:30 -0500 Subject: [PATCH 04/12] further bug fixes and race condition fixes --- ...TransportStopDataFrameTransformAction.java | 14 +- .../persistence/DataFrameInternalIndex.java | 7 +- ...FrameTransformPersistentTasksExecutor.java | 7 +- .../transforms/DataFrameTransformTask.java | 146 ++++++++++-------- 4 files changed, 95 insertions(+), 79 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 66f61117d17cd..aadee20e9fca6 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -141,20 +141,18 @@ protected void taskOperation(Request request, DataFrameTransformTask transformTa } if (ids.contains(transformTask.getTransformId())) { - // To cover strange state race conditions, we adjust the variable first (which writes to cluster state if it is different) + // To cover strange state race conditions, we attempt which writes to cluster state if it is different // then we stop the transform - transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap( - r -> { + transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), () -> { try { - transformTask.stop(request.isForce()); + transformTask.stop(request.isForce(), request.isWaitForCheckpoint()); } catch (ElasticsearchException ex) { listener.onFailure(ex); return; } - listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); - }, - listener::onFailure - )); + listener.onResponse(new StopDataFrameTransformAction.Response(true)); + } + ); } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java index cad4f0cba484d..a93b973cd7d7c 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java @@ -40,10 +40,11 @@ public final class DataFrameInternalIndex { * progress::docs_processed, progress::docs_indexed, * stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed, * stats::exponential_avg_documents_processed + * version 3 (7.5): state::should_stop_at_checkpoint */ // constants for the index - public static final String INDEX_VERSION = "2"; + public static final String INDEX_VERSION = "3"; public static final String INDEX_PATTERN = ".data-frame-internal-"; public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION; public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME; @@ -68,6 +69,7 @@ public final class DataFrameInternalIndex { public static final String DOUBLE = "double"; public static final String LONG = "long"; public static final String KEYWORD = "keyword"; + public static final String BOOLEAN = "boolean"; public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException { IndexTemplateMetaData dataFrameTemplate = IndexTemplateMetaData.builder(LATEST_INDEX_VERSIONED_NAME) @@ -167,6 +169,9 @@ private static XContentBuilder addDataFrameTransformStoredDocMappings(XContentBu .startObject(DataFrameTransformState.INDEXER_STATE.getPreferredName()) .field(TYPE, KEYWORD) .endObject() + .startObject(DataFrameTransformState.SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName()) + .field(TYPE, BOOLEAN) + .endObject() .startObject(DataFrameTransformState.CURRENT_POSITION.getPreferredName()) .field(ENABLED, false) .endObject() diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index c0d9d5671937e..be6a332ec043d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -137,6 +137,11 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr final SetOnce stateHolder = new SetOnce<>(); + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + final DataFrameTransformState transformPTaskState = (DataFrameTransformState) state; + final boolean shouldStopAtCheckpoint = transformPTaskState != null && transformPTaskState.shouldStopAtNextCheckpoint(); + buildTask.setShouldStopAtCheckpoint(shouldStopAtCheckpoint); + ActionListener startTaskListener = ActionListener.wrap( response -> logger.info("Successfully completed and scheduled task in node operation"), failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure) @@ -193,7 +198,6 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr stateAndStats -> { logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString()); DataFrameTransformState transformState = stateAndStats.getTransformState(); - // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING indexerBuilder.setInitialStats(stateAndStats.getTransformStats()) .setInitialPosition(stateAndStats.getTransformState().getPosition()) .setProgress(stateAndStats.getTransformState().getProgress()) @@ -204,7 +208,6 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr stateAndStats.getTransformState().getPosition()); stateHolder.set(transformState); - buildTask.setShouldStopAtCheckpoint(transformState.shouldStopAtNextCheckpoint()); final long lastCheckpoint = stateHolder.get().getCheckpoint(); if (lastCheckpoint == 0) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 4cbb2098e9928..78c5781517f2e 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -63,6 +63,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_START_FAILED_TRANSFORM; import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM; @@ -219,31 +220,6 @@ public void getCheckpointingInfo(DataFrameTransformsCheckpointService transforms )); } - public DataFrameTransformCheckpoint getLastCheckpoint() { - return getIndexer().getLastCheckpoint(); - } - - public DataFrameTransformCheckpoint getNextCheckpoint() { - return getIndexer().getNextCheckpoint(); - } - - /** - * Get the in-progress checkpoint - * - * @return checkpoint in progress or 0 if task/indexer is not active - */ - public long getInProgressCheckpoint() { - if (getIndexer() == null) { - return 0; - } else { - return indexer.get().getState().equals(IndexerState.INDEXING) ? currentCheckpoint.get() + 1L : 0; - } - } - - public synchronized void setTaskStateStopped() { - taskState.set(DataFrameTransformTaskState.STOPPED); - } - /** * Start the background indexer and set the task's state to started * @param startingCheckpoint Set the current checkpoint to this value. If null the @@ -260,6 +236,15 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis RestStatus.CONFLICT)); return; } + // If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again. + if (taskState.get() == DataFrameTransformTaskState.STARTED) { + listener.onFailure(new ElasticsearchStatusException( + "Cannot start transform [{}] as it is already STARTED.", + RestStatus.CONFLICT, + getTransformId() + )); + return; + } if (getIndexer() == null) { // If our state is failed AND the indexer is null, the user needs to _stop?force=true so that the indexer gets // fully initialized. @@ -334,40 +319,48 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis * @param shouldStopAtCheckpointListener the listener to return to when we have persisted the updated value to cluster state. */ public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, - ActionListener shouldStopAtCheckpointListener) { - logger.debug("[{}] attempted to set task to stop at checkpoint with state [{}]", getTransformId(), getState()); + Runnable shouldStopAtCheckpointListener) { + logger.debug("[{}] attempted to set task to stop at checkpoint [{}] with state [{}]", + getTransformId(), + shouldStopAtCheckpoint, + getState()); if (this.shouldStopAtCheckpoint == shouldStopAtCheckpoint || taskState.get() == DataFrameTransformTaskState.STARTED == false || getIndexer() == null || getIndexer().getState() == IndexerState.STOPPED || getIndexer().getState() == IndexerState.STOPPING) { - shouldStopAtCheckpointListener.onResponse(null); + shouldStopAtCheckpointListener.run(); return; } this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; DataFrameTransformState state = getState(); - getIndexer().transformsConfigManager.putOrUpdateTransformStoredDoc( - new DataFrameTransformStoredDoc(getTransformId(), state, getStats()), + persistStateToClusterState(state, ActionListener.wrap( r -> { - // for auto stop shutdown the task - shouldStopAtCheckpointListener.onResponse(null); + logger.debug("[{}] successfully persisted to clusterstate should_stop_at_checkpoint update [{}]", + getTransformId(), + shouldStopAtCheckpoint); + shouldStopAtCheckpointListener.run(); }, statsExc -> { - logger.error("Updating stats of transform [" + getTransformId() + "] failed", statsExc); - // TODO do we want to unset it if we are returning a failure here? - this.shouldStopAtCheckpoint = !shouldStopAtCheckpoint; - shouldStopAtCheckpointListener.onFailure(statsExc); + logger.warn("[{}] failed to persist to clusterstate should_stop_at_checkpoint update [{}]", + getTransformId(), + shouldStopAtCheckpoint); + shouldStopAtCheckpointListener.run(); } )); } - public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { + synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; } - public synchronized void stop(boolean force) { - logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState()); + public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { + logger.debug("[{}] stop called with force [{}], shouldStopAtCheckpoint [{}], state [{}]", + getTransformId(), + force, + shouldStopAtCheckpoint, + getState()); if (getIndexer() == null) { // If there is no indexer the task has not been triggered // but it still needs to be stopped and removed @@ -387,13 +380,18 @@ public synchronized void stop(boolean force) { RestStatus.CONFLICT); } - // shouldStopAtCheckpoint only comes into play when onFinish is called, if the indexerState is STARTED, that is a good - // indication that the indexer is not running and has previously finished a checkpoint, or has yet to even start one. + stateReason.set(null); + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + // shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after), if the indexerState is STARTED and are on an initialRun, + // that is a good indication that the indexer is not running and has previously finished a checkpoint, or has yet to even start one. // Either way, this means that we won't get to have onFinish called down stream. We should just stop the indexer - taskState.set(DataFrameTransformTaskState.STARTED); - if (this.shouldStopAtCheckpoint == false || getIndexer().getState() == IndexerState.STARTED) { - stateReason.set(null); + if (shouldStopAtCheckpoint == false || (getIndexer().getState() == IndexerState.STARTED && getIndexer().initialRun())) { IndexerState state = getIndexer().stop(); + // No reason to keep it in the potentially failed state. + // Since we have called `stop` against the indexer, we have no more fear of triggering again accidentally. + // But, since `doSaveState` is asynchronous, it is best to set the state as STARTED so that another `start` call cannot be + // executed while we are wrapping up. + taskState.compareAndSet(DataFrameTransformTaskState.FAILED, DataFrameTransformTaskState.STARTED); if (state == IndexerState.STOPPED) { getIndexer().onStop(); getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); @@ -413,8 +411,10 @@ public synchronized void triggered(Event event) { return; } - if (taskState.get() == DataFrameTransformTaskState.FAILED) { - logger.debug("[{}] schedule was triggered for transform but task is failed. Ignoring trigger.", getTransformId()); + if (taskState.get() == DataFrameTransformTaskState.FAILED || taskState.get() == DataFrameTransformTaskState.STOPPED) { + logger.debug("[{}] schedule was triggered for transform but task is [{}]. Ignoring trigger.", + getTransformId(), + taskState.get()); return; } @@ -431,7 +431,7 @@ public synchronized void triggered(Event event) { // if it runs for the 1st time we just do it, if not we check for changes if (currentCheckpoint.get() == 0) { - logger.debug("Trigger initial run."); + logger.debug("[{}] trigger initial run.", getTransformId()); getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); } else if (getIndexer().isContinuous()) { getIndexer().maybeTriggerAsyncJob(System.currentTimeMillis()); @@ -447,17 +447,6 @@ synchronized void shutdown() { markAsCompleted(); } - public DataFrameTransformProgress getProgress() { - if (indexer.get() == null) { - return null; - } - DataFrameTransformProgress indexerProgress = indexer.get().getProgress(); - if (indexerProgress == null) { - return null; - } - return new DataFrameTransformProgress(indexerProgress); - } - void persistStateToClusterState(DataFrameTransformState state, ActionListener> listener) { updatePersistentTaskState(state, ActionListener.wrap( @@ -501,9 +490,9 @@ synchronized void markAsFailed(String reason, ActionListener listener) { // We should not keep retrying. Either the task will be stopped, or started // If it is started again, it is registered again. deregisterSchedulerJob(); - shouldStopAtCheckpoint = false; // The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again, // we should set this flag to false. + shouldStopAtCheckpoint = false; // The end user should see that the task is in a failed state, and attempt to stop it again but with force=true taskState.set(DataFrameTransformTaskState.FAILED); stateReason.set(reason); @@ -925,33 +914,55 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p next.run(); return; } - // If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING` - // OR we called `doSaveState` manually as the indexer was not actively running. - // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state - if (indexerState.equals(IndexerState.STOPPED)) { - transformTask.setTaskStateStopped(); - } DataFrameTransformTaskState taskState = transformTask.taskState.get(); + boolean shouldStopAtCheckpoint = transformTask.shouldStopAtCheckpoint; if (indexerState.equals(IndexerState.STARTED) && transformTask.currentCheckpoint.get() == 1 && this.isContinuous() == false) { // set both to stopped so they are persisted as such - taskState = DataFrameTransformTaskState.STOPPED; indexerState = IndexerState.STOPPED; auditor.info(transformConfig.getId(), "Data frame finished indexing all data, initiating stop"); logger.info("[{}] data frame transform finished indexing all data, initiating stop.", transformConfig.getId()); } + + // If we should stop at the next checkpoint, are STARTED, and with `initialRun()` we are in one of two states + // 1. We have just called `onFinish` completing our request, but `shouldStopAtCheckpoint` was set to `true` before our check + // there and now + // 2. We are on the very first run of a NEW checkpoint and got here either through a failure, or the very first save state call. + // + // In either case, we should stop so that we guarantee a consistent state and that there are no partially completed checkpoints + if (shouldStopAtCheckpoint && initialRun() && indexerState.equals(IndexerState.STARTED)) { + indexerState = IndexerState.STOPPED; + auditor.info(transformConfig.getId(), "Data frame is no longer in the middle of a checkpoint, initiating stop."); + logger.info("[{}] data frame transform is no longer in the middle of a checkpoint, initiating stop.", + transformConfig.getId()); + } + + // If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING` + // OR we called `doSaveState` manually as the indexer was not actively running. + // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state + if (indexerState.equals(IndexerState.STOPPED)) { + taskState = DataFrameTransformTaskState.STOPPED; + // If we are going to stop after the state is saved, we should NOT persist `shouldStopAtCheckpoint: true` as this may + // cause problems if the task starts up again. + // Additionally, we don't have to worry about inconsistency with the ClusterState (if it is persisted there) as the + // when we stop, we mark the task as complete and that state goes away. + shouldStopAtCheckpoint = false; + } + final DataFrameTransformState state = new DataFrameTransformState( taskState, indexerState, position, transformTask.currentCheckpoint.get(), transformTask.stateReason.get(), - getProgress()); + getProgress(), + null, + shouldStopAtCheckpoint); logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString()); // Persist the current state and stats in the internal index. The interval of this method being @@ -1058,7 +1069,6 @@ protected void onFinish(ActionListener listener) { // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING if (transformTask.shouldStopAtCheckpoint) { stop(); - transformTask.shouldStopAtCheckpoint = false; } listener.onResponse(null); } catch (Exception e) { From 8ef6160c6d45a217af6feb9aa75252763d35e76f Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 28 Aug 2019 14:58:41 -0500 Subject: [PATCH 05/12] addressing formatting concerns --- .../action/TransportStopDataFrameTransformAction.java | 4 ++-- .../dataframe/transforms/DataFrameTransformTask.java | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index aadee20e9fca6..f034b527b8549 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -141,8 +141,8 @@ protected void taskOperation(Request request, DataFrameTransformTask transformTa } if (ids.contains(transformTask.getTransformId())) { - // To cover strange state race conditions, we attempt which writes to cluster state if it is different - // then we stop the transform + // To cover for node failure while waiting for the checkpoint to stop + // we write to cluster state if it is different and then we stop the transform transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), () -> { try { transformTask.stop(request.isForce(), request.isWaitForCheckpoint()); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 78c5781517f2e..de1fa849fc8c8 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -63,7 +63,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_START_FAILED_TRANSFORM; import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM; @@ -382,13 +381,15 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { stateReason.set(null); this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; - // shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after), if the indexerState is STARTED and are on an initialRun, - // that is a good indication that the indexer is not running and has previously finished a checkpoint, or has yet to even start one. - // Either way, this means that we won't get to have onFinish called down stream. We should just stop the indexer + // shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after). + // If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint, + // or has yet to even start one. + // Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time). + // We should just stop the indexer if (shouldStopAtCheckpoint == false || (getIndexer().getState() == IndexerState.STARTED && getIndexer().initialRun())) { IndexerState state = getIndexer().stop(); // No reason to keep it in the potentially failed state. - // Since we have called `stop` against the indexer, we have no more fear of triggering again accidentally. + // Since we have called `stop` against the indexer, we have no more fear of triggering again. // But, since `doSaveState` is asynchronous, it is best to set the state as STARTED so that another `start` call cannot be // executed while we are wrapping up. taskState.compareAndSet(DataFrameTransformTaskState.FAILED, DataFrameTransformTaskState.STARTED); From 0acffb32a425195b9f940d0e7541caaf65c7f3ce Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 3 Sep 2019 11:32:05 -0500 Subject: [PATCH 06/12] moving to save state into index and not cluster state --- ...TransportStopDataFrameTransformAction.java | 21 ++- ...FrameTransformPersistentTasksExecutor.java | 6 +- .../transforms/DataFrameTransformTask.java | 137 ++++++++++-------- 3 files changed, 90 insertions(+), 74 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index f034b527b8549..9039bb30b3443 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -141,17 +141,16 @@ protected void taskOperation(Request request, DataFrameTransformTask transformTa } if (ids.contains(transformTask.getTransformId())) { - // To cover for node failure while waiting for the checkpoint to stop - // we write to cluster state if it is different and then we stop the transform - transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), () -> { - try { - transformTask.stop(request.isForce(), request.isWaitForCheckpoint()); - } catch (ElasticsearchException ex) { - listener.onFailure(ex); - return; - } - listener.onResponse(new StopDataFrameTransformAction.Response(true)); - } + transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap( + r -> transformTask.stop(request.isForce(), request.isWaitForCheckpoint()), + e -> listener.onFailure( + new ElasticsearchStatusException( + "Failed to update transform task [{}] state value should_stop_at_checkpoint from [{}] to [{}]", + RestStatus.CONFLICT, + transformTask.getTransformId(), + transformTask.getState().shouldStopAtNextCheckpoint(), + request.isWaitForCheckpoint())) + ) ); } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId() diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 783d6fdd0c6fa..173bb0654ea8f 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -139,10 +139,6 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr final SetOnce stateHolder = new SetOnce<>(); - // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING - final DataFrameTransformState transformPTaskState = (DataFrameTransformState) state; - final boolean shouldStopAtCheckpoint = transformPTaskState != null && transformPTaskState.shouldStopAtNextCheckpoint(); - buildTask.setShouldStopAtCheckpoint(shouldStopAtCheckpoint); ActionListener startTaskListener = ActionListener.wrap( response -> logger.info("Successfully completed and scheduled task in node operation"), @@ -215,6 +211,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr stateHolder.set(transformState); final long lastCheckpoint = stateHolder.get().getCheckpoint(); + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + buildTask.setShouldStopAtCheckpoint(transformState.shouldStopAtNextCheckpoint()); if (lastCheckpoint == 0) { logger.trace("[{}] No last checkpoint found, looking for next checkpoint", transformId); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index f243fdd563df0..c6d0949f774a5 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -328,10 +328,10 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis * * It only persists to cluster state if the value is different than what is currently held in memory. * @param shouldStopAtCheckpoint whether or not we should stop at the next checkpoint or not - * @param shouldStopAtCheckpointListener the listener to return to when we have persisted the updated value to cluster state. + * @param shouldStopAtCheckpointListener the listener to return to when we have persisted the updated value to the state index. */ public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, - Runnable shouldStopAtCheckpointListener) { + ActionListener shouldStopAtCheckpointListener) { logger.debug("[{}] attempted to set task to stop at checkpoint [{}] with state [{}]", getTransformId(), shouldStopAtCheckpoint, @@ -341,24 +341,33 @@ public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoin getIndexer() == null || getIndexer().getState() == IndexerState.STOPPED || getIndexer().getState() == IndexerState.STOPPING) { - shouldStopAtCheckpointListener.run(); + shouldStopAtCheckpointListener.onResponse(null); return; } - this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; - DataFrameTransformState state = getState(); - persistStateToClusterState(state, + DataFrameTransformState state = new DataFrameTransformState( + taskState.get(), + indexer.get().getState(), + indexer.get().getPosition(), + currentCheckpoint.get(), + stateReason.get(), + getIndexer().getProgress(), + null, //Node attributes + shouldStopAtCheckpoint); + getIndexer().doSaveState(state, ActionListener.wrap( r -> { - logger.debug("[{}] successfully persisted to clusterstate should_stop_at_checkpoint update [{}]", + // We only want to update this internal value if it is persisted as such + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + logger.debug("[{}] successfully persisted should_stop_at_checkpoint update [{}]", getTransformId(), shouldStopAtCheckpoint); - shouldStopAtCheckpointListener.run(); + shouldStopAtCheckpointListener.onResponse(null); }, statsExc -> { - logger.warn("[{}] failed to persist to clusterstate should_stop_at_checkpoint update [{}]", + logger.warn("[{}] failed to persist should_stop_at_checkpoint update [{}]", getTransformId(), shouldStopAtCheckpoint); - shouldStopAtCheckpointListener.run(); + shouldStopAtCheckpointListener.onFailure(statsExc); } )); } @@ -393,7 +402,6 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { } stateReason.set(null); - this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; // shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after). // If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint, // or has yet to even start one. @@ -1010,57 +1018,63 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p null, shouldStopAtCheckpoint); logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString()); + doSaveState(state, ActionListener.wrap( + r -> next.run(), + e -> next.run() + )); + } - // This could be `null` but the putOrUpdateTransformStoredDoc handles that case just fine - SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = transformTask.getSeqNoPrimaryTermAndIndex(); - - // Persist the current state and stats in the internal index. The interval of this method being - // called is controlled by AsyncTwoPhaseIndexer#onBulkResponse which calls doSaveState every so - // often when doing bulk indexing calls or at the end of one indexing run. - transformsConfigManager.putOrUpdateTransformStoredDoc( - new DataFrameTransformStoredDoc(transformId, state, getStats()), - seqNoPrimaryTermAndIndex, - ActionListener.wrap( - r -> { - transformTask.updateSeqNoPrimaryTermAndIndex(seqNoPrimaryTermAndIndex, r); - // for auto stop shutdown the task - if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) { - transformTask.shutdown(); - } - // Only do this clean up once, if it succeeded, no reason to do the query again. - if (oldStatsCleanedUp.compareAndSet(false, true)) { - transformsConfigManager.deleteOldTransformStoredDocuments(transformId, ActionListener.wrap( - nil -> { - logger.trace("[{}] deleted old transform stats and state document", transformId); - next.run(); - }, - e -> { - String msg = LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", - transformId); - logger.warn(msg, e); - // If we have failed, we should attempt the clean up again later - oldStatsCleanedUp.set(false); - next.run(); - } - )); - } else { - next.run(); - } + protected void doSaveState(DataFrameTransformState state, ActionListener listener) { + // This could be `null` but the putOrUpdateTransformStoredDoc handles that case just fine + SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = transformTask.getSeqNoPrimaryTermAndIndex(); + + // Persist the current state and stats in the internal index. The interval of this method being + // called is controlled by AsyncTwoPhaseIndexer#onBulkResponse which calls doSaveState every so + // often when doing bulk indexing calls or at the end of one indexing run. + transformsConfigManager.putOrUpdateTransformStoredDoc( + new DataFrameTransformStoredDoc(transformId, state, getStats()), + seqNoPrimaryTermAndIndex, + ActionListener.wrap( + r -> { + transformTask.updateSeqNoPrimaryTermAndIndex(seqNoPrimaryTermAndIndex, r); + // for auto stop shutdown the task + if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) { + transformTask.shutdown(); + } + // Only do this clean up once, if it succeeded, no reason to do the query again. + if (oldStatsCleanedUp.compareAndSet(false, true)) { + transformsConfigManager.deleteOldTransformStoredDocuments(transformId, ActionListener.wrap( + nil -> { + logger.trace("[{}] deleted old transform stats and state document", transformId); + listener.onResponse(null); }, - statsExc -> { - logger.error(new ParameterizedMessage("[{}] updating stats of transform failed.", - transformConfig.getId()), - statsExc); - auditor.warning(getJobId(), - "Failure updating stats of transform: " + statsExc.getMessage()); - // for auto stop shutdown the task - if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) { - transformTask.shutdown(); - } - next.run(); + e -> { + String msg = LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", + transformId); + logger.warn(msg, e); + // If we have failed, we should attempt the clean up again later + oldStatsCleanedUp.set(false); + listener.onResponse(null); } - )); - } + )); + } else { + listener.onResponse(null); + } + }, + statsExc -> { + logger.error(new ParameterizedMessage("[{}] updating stats of transform failed.", + transformConfig.getId()), + statsExc); + auditor.warning(getJobId(), + "Failure updating stats of transform: " + statsExc.getMessage()); + // for auto stop shutdown the task + if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) { + transformTask.shutdown(); + } + listener.onFailure(statsExc); + } + )); + } @Override protected void onFailure(Exception exc) { @@ -1080,7 +1094,12 @@ protected void onFinish(ActionListener listener) { // This indicates an early exit since no changes were found. // So, don't treat this like a checkpoint being completed, as no work was done. if (hasSourceChanged == false) { + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + if (transformTask.shouldStopAtCheckpoint) { + stop(); + } listener.onResponse(null); + return; } // TODO: needs cleanup super is called with a listener, but listener.onResponse is called below // super.onFinish() fortunately ignores the listener From 10bd717b7e73996c5a3bd9fa5504846951ba8d77 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 3 Sep 2019 16:33:31 -0500 Subject: [PATCH 07/12] adding waitforcheckpoint integration test; --- .../integration/DataFramePivotRestIT.java | 41 ++++++++++++ .../integration/DataFrameRestTestCase.java | 6 +- .../transforms/DataFrameTransformTask.java | 63 ++++++++++--------- 3 files changed, 79 insertions(+), 31 deletions(-) diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index aca70d146fa3b..9b39f8ae22456 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -857,6 +857,47 @@ public void testManyBucketsWithSmallPageSize() throws Exception { assertEquals(101, ((List)XContentMapValues.extractValue("transforms.stats.pages_processed", stats)).get(0)); } + public void testContinuousStopWaitForCheckpoint() throws Exception { + String indexName = "continuous_reviews"; + createReviewsIndex(indexName); + String transformId = "simple_continuous_pivot"; + String dataFrameIndex = "pivot_reviews_continuous"; + setupDataAccessRole(DATA_ACCESS_ROLE, indexName, dataFrameIndex); + final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, + BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + String config = "{" + + " \"source\": {\"index\":\"" + indexName + "\"}," + + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + + " \"frequency\": \"1s\"," + + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}}," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } } } }" + + "}"; + createDataframeTransformRequest.setJsonEntity(config); + Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); + assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + startAndWaitForContinuousTransform(transformId, dataFrameIndex, null); + assertTrue(indexExists(dataFrameIndex)); + stopDataFrameTransform(transformId, false, true); + + // get and check some users + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918); + } + private void assertOnePivotValue(String query, double expected) throws IOException { Map searchResult = getAsMap(query); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 455009b49691e..e85e197cc7611 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -240,10 +240,14 @@ protected void startDataframeTransform(String transformId, boolean force, String } protected void stopDataFrameTransform(String transformId, boolean force) throws Exception { - // start the transform + stopDataFrameTransform(transformId, force, false); + } + + protected void stopDataFrameTransform(String transformId, boolean force, boolean waitForCheckpoint) throws Exception { final Request stopTransformRequest = createRequestWithAuth("POST", DATAFRAME_ENDPOINT + transformId + "/_stop", null); stopTransformRequest.addParameter(DataFrameField.FORCE.getPreferredName(), Boolean.toString(force)); stopTransformRequest.addParameter(DataFrameField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(true)); + stopTransformRequest.addParameter(DataFrameField.WAIT_FOR_CHECKPOINT.getPreferredName(), Boolean.toString(waitForCheckpoint)); Map stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest)); assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index c6d0949f774a5..3020948f40001 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -357,7 +357,7 @@ public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoin ActionListener.wrap( r -> { // We only want to update this internal value if it is persisted as such - this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + this.setShouldStopAtCheckpoint(shouldStopAtCheckpoint); logger.debug("[{}] successfully persisted should_stop_at_checkpoint update [{}]", getTransformId(), shouldStopAtCheckpoint); @@ -402,18 +402,21 @@ public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { } stateReason.set(null); + // No reason to keep it in the potentially failed state. + // Since `doSaveState` is asynchronous, it is best to set the state as STARTED so that another `start` call cannot be + // executed while we are wrapping up. + boolean wasFailed = taskState.compareAndSet(DataFrameTransformTaskState.FAILED, DataFrameTransformTaskState.STARTED); + // shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after). - // If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint, - // or has yet to even start one. - // Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time). - // We should just stop the indexer - if (shouldStopAtCheckpoint == false || (getIndexer().getState() == IndexerState.STARTED && getIndexer().initialRun())) { + // if it is false, stop immediately + if (shouldStopAtCheckpoint == false || + // If state was in a failed state, we should stop immediately as we will never reach the next checkpoint + wasFailed || + // If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint, + // or has yet to even start one. + // Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time). + (getIndexer().getState() == IndexerState.STARTED && getIndexer().initialRun())) { IndexerState state = getIndexer().stop(); - // No reason to keep it in the potentially failed state. - // Since we have called `stop` against the indexer, we have no more fear of triggering again. - // But, since `doSaveState` is asynchronous, it is best to set the state as STARTED so that another `start` call cannot be - // executed while we are wrapping up. - taskState.compareAndSet(DataFrameTransformTaskState.FAILED, DataFrameTransformTaskState.STARTED); if (state == IndexerState.STOPPED) { getIndexer().onStop(); getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); @@ -960,27 +963,9 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p return; } - // This means that the indexer was triggered to discover changes, found none, and exited early. - // If the state is `STOPPED` this means that DataFrameTransformTask#stop was called while we were checking for changes. - // Allow the stop call path to continue - if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) { - next.run(); - return; - } - DataFrameTransformTaskState taskState = transformTask.taskState.get(); boolean shouldStopAtCheckpoint = transformTask.shouldStopAtCheckpoint; - if (indexerState.equals(IndexerState.STARTED) - && transformTask.currentCheckpoint.get() == 1 - && this.isContinuous() == false) { - // set both to stopped so they are persisted as such - indexerState = IndexerState.STOPPED; - - auditor.info(transformConfig.getId(), "Data frame finished indexing all data, initiating stop"); - logger.info("[{}] data frame transform finished indexing all data, initiating stop.", transformConfig.getId()); - } - // If we should stop at the next checkpoint, are STARTED, and with `initialRun()` we are in one of two states // 1. We have just called `onFinish` completing our request, but `shouldStopAtCheckpoint` was set to `true` before our check // there and now @@ -994,6 +979,24 @@ protected void doSaveState(IndexerState indexerState, DataFrameIndexerPosition p transformConfig.getId()); } + if (indexerState.equals(IndexerState.STARTED) + && transformTask.currentCheckpoint.get() == 1 + && this.isContinuous() == false) { + // set both to stopped so they are persisted as such + indexerState = IndexerState.STOPPED; + + auditor.info(transformConfig.getId(), "Data frame finished indexing all data, initiating stop"); + logger.info("[{}] data frame transform finished indexing all data, initiating stop.", transformConfig.getId()); + } + + // This means that the indexer was triggered to discover changes, found none, and exited early. + // If the state is `STOPPED` this means that DataFrameTransformTask#stop was called while we were checking for changes. + // Allow the stop call path to continue + if (hasSourceChanged == false && indexerState.equals(IndexerState.STOPPED) == false) { + next.run(); + return; + } + // If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING` // OR we called `doSaveState` manually as the indexer was not actively running. // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state @@ -1074,7 +1077,7 @@ protected void doSaveState(DataFrameTransformState state, ActionListener l listener.onFailure(statsExc); } )); - } + } @Override protected void onFailure(Exception exc) { From ffd8acb2991ac842080f1c108425adc46ab4d556 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 5 Sep 2019 06:42:49 -0500 Subject: [PATCH 08/12] fixing stop logic --- .../dataframe/integration/DataFramePivotRestIT.java | 12 +++++++++--- .../TransportStopDataFrameTransformAction.java | 9 ++++++++- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index 9b39f8ae22456..f459207d520e9 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -858,10 +858,16 @@ public void testManyBucketsWithSmallPageSize() throws Exception { } public void testContinuousStopWaitForCheckpoint() throws Exception { - String indexName = "continuous_reviews"; + Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings"); + addFailureRetrySetting.setJsonEntity( + "{\"transient\": {" + + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + + "\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}"); + client().performRequest(addFailureRetrySetting); + String indexName = "continuous_reviews_wait_for_checkpoint"; createReviewsIndex(indexName); - String transformId = "simple_continuous_pivot"; - String dataFrameIndex = "pivot_reviews_continuous"; + String transformId = "simple_continuous_pivot_wait_for_checkpoint"; + String dataFrameIndex = "pivot_reviews_continuous_wait_for_checkpoint"; setupDataAccessRole(DATA_ACCESS_ROLE, indexName, dataFrameIndex); final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 9039bb30b3443..3614d82b153b4 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -142,7 +142,14 @@ protected void taskOperation(Request request, DataFrameTransformTask transformTa if (ids.contains(transformTask.getTransformId())) { transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap( - r -> transformTask.stop(request.isForce(), request.isWaitForCheckpoint()), + r -> { + try { + transformTask.stop(request.isForce(), request.isWaitForCheckpoint()); + listener.onResponse(new Response(true)); + } catch (ElasticsearchException ex) { + listener.onFailure(ex); + } + }, e -> listener.onFailure( new ElasticsearchStatusException( "Failed to update transform task [{}] state value should_stop_at_checkpoint from [{}] to [{}]", From 33a4096149d7c5ae5b902c3e03c4999d7d142d93 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 5 Sep 2019 06:43:21 -0500 Subject: [PATCH 09/12] fixing request name in test --- .../xpack/dataframe/integration/DataFramePivotRestIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index f459207d520e9..c346d6429f877 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -858,12 +858,12 @@ public void testManyBucketsWithSmallPageSize() throws Exception { } public void testContinuousStopWaitForCheckpoint() throws Exception { - Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings"); - addFailureRetrySetting.setJsonEntity( + Request updateLoggingLevels = new Request("PUT", "/_cluster/settings"); + updateLoggingLevels.setJsonEntity( "{\"transient\": {" + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + "\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}"); - client().performRequest(addFailureRetrySetting); + client().performRequest(updateLoggingLevels); String indexName = "continuous_reviews_wait_for_checkpoint"; createReviewsIndex(indexName); String transformId = "simple_continuous_pivot_wait_for_checkpoint"; From e8db75ca82e6ea18cd400039f0e94af138844cf1 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 5 Sep 2019 16:53:45 -0500 Subject: [PATCH 10/12] dont wait for tasks completion if there are failures --- .../xpack/dataframe/integration/DataFramePivotRestIT.java | 2 +- .../action/TransportStopDataFrameTransformAction.java | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index c346d6429f877..a05933796206e 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -894,7 +894,7 @@ public void testContinuousStopWaitForCheckpoint() throws Exception { startAndWaitForContinuousTransform(transformId, dataFrameIndex, null); assertTrue(indexExists(dataFrameIndex)); - stopDataFrameTransform(transformId, false, true); + assertBusy(() -> stopDataFrameTransform(transformId, false, true)); // get and check some users assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 3614d82b153b4..ea73d546c73fa 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -197,6 +197,12 @@ private ActionListener waitForStopListener(Request request, ActionList ); return ActionListener.wrap( response -> { + // If there were failures attempting to stop the tasks, we don't know if they will actually stop. + // It is better to respond to the user now than allow for the persistent task waiting to timeout + if (response.getTaskFailures().isEmpty() == false || response.getNodeFailures().isEmpty() == false) { + listener.onResponse(response); + return; + } // Wait until the persistent task is stopped // Switch over to Generic threadpool so we don't block the network thread threadPool.generic().execute(() -> From 4a838498732d12a79e475f7b04a77c3e37669e48 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 6 Sep 2019 10:16:57 -0500 Subject: [PATCH 11/12] Building intelligible error from failures, fixing yml tests --- ...TransportStopDataFrameTransformAction.java | 45 +++++++++++- ...portStopDataFrameTransformActionTests.java | 68 +++++++++++++++++++ .../data_frame.stop_data_frame_transform.json | 5 ++ .../test/data_frame/transforms_start_stop.yml | 11 +++ .../test/data_frame/transforms_stats.yml | 2 + 5 files changed, 130 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index ea73d546c73fa..e5d4a39ab1635 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -46,6 +46,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.xpack.core.dataframe.DataFrameMessages.DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM; @@ -200,7 +202,8 @@ private ActionListener waitForStopListener(Request request, ActionList // If there were failures attempting to stop the tasks, we don't know if they will actually stop. // It is better to respond to the user now than allow for the persistent task waiting to timeout if (response.getTaskFailures().isEmpty() == false || response.getNodeFailures().isEmpty() == false) { - listener.onResponse(response); + RestStatus status = firstNotOKStatus(response.getTaskFailures(), response.getNodeFailures()); + listener.onFailure(buildException(response.getTaskFailures(), response.getNodeFailures(), status)); return; } // Wait until the persistent task is stopped @@ -212,6 +215,46 @@ private ActionListener waitForStopListener(Request request, ActionList ); } + static ElasticsearchStatusException buildException(List taskOperationFailures, + List elasticsearchExceptions, + RestStatus status) { + List exceptions = Stream.concat( + taskOperationFailures.stream().map(TaskOperationFailure::getCause), + elasticsearchExceptions.stream()).collect(Collectors.toList()); + + ElasticsearchStatusException elasticsearchStatusException = + new ElasticsearchStatusException(exceptions.get(0).getMessage(), status); + + for (int i = 1; i < exceptions.size(); i++) { + elasticsearchStatusException.addSuppressed(exceptions.get(i)); + } + return elasticsearchStatusException; + } + + static RestStatus firstNotOKStatus(List taskOperationFailures, List exceptions) { + RestStatus status = RestStatus.OK; + + for (TaskOperationFailure taskOperationFailure : taskOperationFailures) { + status = taskOperationFailure.getStatus(); + if (RestStatus.OK.equals(status) == false) { + break; + } + } + if (status == RestStatus.OK) { + for (ElasticsearchException exception : exceptions) { + // As it stands right now, this will ALWAYS be INTERNAL_SERVER_ERROR. + // FailedNodeException does not overwrite the `status()` method and the logic in ElasticsearchException + // Just returns an INTERNAL_SERVER_ERROR + status = exception.status(); + if (RestStatus.OK.equals(status) == false) { + break; + } + } + } + // If all the previous exceptions don't have a valid status, we have an unknown error. + return status == RestStatus.OK ? RestStatus.INTERNAL_SERVER_ERROR : status; + } + private void waitForDataFrameStopped(Set persistentTaskIds, TimeValue timeout, boolean force, diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java index 61fad63c83253..47b81a0e1b6a3 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformActionTests.java @@ -5,12 +5,15 @@ */ package org.elasticsearch.xpack.dataframe.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; +import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; @@ -18,8 +21,10 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.core.indexing.IndexerState; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.elasticsearch.rest.RestStatus.CONFLICT; import static org.hamcrest.Matchers.equalTo; @@ -91,4 +96,67 @@ public void testTaskStateValidationWithDataFrameTasks() { "task has failed"))); } + public void testFirstNotOKStatus() { + List nodeFailures = new ArrayList<>(); + List taskOperationFailures = new ArrayList<>(); + + nodeFailures.add(new ElasticsearchException("nodefailure", + new ElasticsearchStatusException("failure", RestStatus.UNPROCESSABLE_ENTITY))); + taskOperationFailures.add(new TaskOperationFailure("node", + 1, + new ElasticsearchStatusException("failure", RestStatus.BAD_REQUEST))); + + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus(Collections.emptyList(), Collections.emptyList()), + equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus(taskOperationFailures, Collections.emptyList()), + equalTo(RestStatus.BAD_REQUEST)); + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus(taskOperationFailures, nodeFailures), + equalTo(RestStatus.BAD_REQUEST)); + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus(taskOperationFailures, + Collections.singletonList(new ElasticsearchException(new ElasticsearchStatusException("not failure", RestStatus.OK)))), + equalTo(RestStatus.BAD_REQUEST)); + + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus( + Collections.singletonList(new TaskOperationFailure( + "node", + 1, + new ElasticsearchStatusException("not failure", RestStatus.OK))), + nodeFailures), + equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + + assertThat(TransportStopDataFrameTransformAction.firstNotOKStatus( + Collections.emptyList(), + nodeFailures), + equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + } + + public void testBuildException() { + List nodeFailures = new ArrayList<>(); + List taskOperationFailures = new ArrayList<>(); + + nodeFailures.add(new ElasticsearchException("node failure")); + taskOperationFailures.add(new TaskOperationFailure("node", + 1, + new ElasticsearchStatusException("task failure", RestStatus.BAD_REQUEST))); + + RestStatus status = CONFLICT; + ElasticsearchStatusException statusException = + TransportStopDataFrameTransformAction.buildException(taskOperationFailures, nodeFailures, status); + + assertThat(statusException.status(), equalTo(status)); + assertThat(statusException.getMessage(), equalTo(taskOperationFailures.get(0).getCause().getMessage())); + assertThat(statusException.getSuppressed().length, equalTo(1)); + + statusException = TransportStopDataFrameTransformAction.buildException(Collections.emptyList(), nodeFailures, status); + assertThat(statusException.status(), equalTo(status)); + assertThat(statusException.getMessage(), equalTo(nodeFailures.get(0).getMessage())); + assertThat(statusException.getSuppressed().length, equalTo(0)); + + statusException = TransportStopDataFrameTransformAction.buildException(taskOperationFailures, Collections.emptyList(), status); + assertThat(statusException.status(), equalTo(status)); + assertThat(statusException.getMessage(), equalTo(taskOperationFailures.get(0).getCause().getMessage())); + assertThat(statusException.getSuppressed().length, equalTo(0)); + } + } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.stop_data_frame_transform.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.stop_data_frame_transform.json index af0c35b156390..7dfed1e80d6f8 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.stop_data_frame_transform.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.stop_data_frame_transform.json @@ -35,6 +35,11 @@ "type":"boolean", "required":false, "description":"Whether to ignore if a wildcard expression matches no data frame transforms. (This includes `_all` string or when no data frame transforms have been specified)" + }, + "wait_for_checkpoint": { + "type":"boolean", + "required":false, + "description":"Whether to wait for the transform to reach a checkpoint before stopping. Default to true" } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 044f5212a993c..55bf6bdf21c7c 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -30,6 +30,7 @@ setup: teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" timeout: "10m" wait_for_completion: true @@ -104,6 +105,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" wait_for_completion: true - match: { acknowledged: true } @@ -161,6 +163,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop-continuous" wait_for_completion: true - match: { acknowledged: true } @@ -186,6 +189,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop-continuous" wait_for_completion: true - match: { acknowledged: true } @@ -198,18 +202,21 @@ teardown: - do: catch: missing data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "missing-transform" --- "Test stop missing transform by expression": - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false allow_no_match: true transform_id: "missing-transform*" - do: catch: missing data_frame.stop_data_frame_transform: + wait_for_checkpoint: false allow_no_match: false transform_id: "missing-transform*" @@ -217,6 +224,7 @@ teardown: "Test stop already stopped transform": - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" - match: { acknowledged: true } @@ -260,6 +268,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" wait_for_completion: true - match: { acknowledged: true } @@ -274,6 +283,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-later" wait_for_completion: true - match: { acknowledged: true } @@ -308,6 +318,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "_all" wait_for_completion: true - match: { acknowledged: true } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index b4699898d4833..f01079b848874 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -33,6 +33,7 @@ setup: teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-stats" wait_for_completion: true @@ -252,6 +253,7 @@ teardown: - do: data_frame.stop_data_frame_transform: + wait_for_checkpoint: false transform_id: "airline-transform-stats-continuous" wait_for_completion: true From 58d133682a4cad081c83a69cb12cd40883f6b3b0 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 6 Sep 2019 11:28:17 -0500 Subject: [PATCH 12/12] updating assertion --- .../dataframe/integration/DataFramePivotRestIT.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index a05933796206e..4b7d90c9b7ca1 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.dataframe.integration; import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.junit.Before; @@ -894,7 +895,14 @@ public void testContinuousStopWaitForCheckpoint() throws Exception { startAndWaitForContinuousTransform(transformId, dataFrameIndex, null); assertTrue(indexExists(dataFrameIndex)); - assertBusy(() -> stopDataFrameTransform(transformId, false, true)); + assertBusy(() -> { + try { + stopDataFrameTransform(transformId, false, true); + } catch (ResponseException e) { + // We get a conflict sometimes depending on WHEN we try to write the state, should eventually pass though + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(200)); + } + }); // get and check some users assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417);