From 2cbf1f7d55cef533b5ebfabd86ab80e2628704da Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 11 Oct 2019 12:55:15 -0400 Subject: [PATCH 1/5] [ML][Transforms] add wait_for_checkpoint flag to stop --- .../client/TransformRequestConverters.java | 4 + .../transform/StopTransformRequest.java | 21 +++- .../org/elasticsearch/client/TransformIT.java | 4 +- .../TransformRequestConvertersTests.java | 13 ++- .../TransformDocumentationIT.java | 2 +- .../transform/apis/stop-transform.asciidoc | 4 + .../xpack/core/transform/TransformField.java | 1 + .../transform/action/StopTransformAction.java | 24 ++++- .../transform/transforms/TransformState.java | 102 ++++++++++++------ .../transforms/TransformTaskState.java | 1 + .../StopTransformActionRequestTests.java | 24 +++-- .../transforms/TransformStateTests.java | 26 ++++- .../api/transform.stop_transform.json | 31 +++--- .../test/transform/transforms_start_stop.yml | 12 +++ .../test/transform/transforms_stats.yml | 2 + .../transform/integration/TransformIT.java | 43 ++++++++ .../integration/TransformIntegTestCase.java | 12 ++- .../integration/TransformPivotRestIT.java | 56 ++++++++++ .../integration/TransformRestTestCase.java | 6 +- .../TransportDeleteTransformAction.java | 2 +- .../action/TransportStopTransformAction.java | 74 +++++++++++-- .../persistence/TransformInternalIndex.java | 5 + .../rest/action/RestStopTransformAction.java | 4 +- .../RestStopTransformActionDeprecated.java | 4 +- .../transforms/ClientTransformIndexer.java | 63 +++++++++-- .../ClientTransformIndexerBuilder.java | 11 +- .../TransformPersistentTasksExecutor.java | 7 +- .../transform/transforms/TransformTask.java | 100 ++++++++++++++--- .../TransportStopTransformActionTests.java | 68 ++++++++++++ .../ClientTransformIndexerTests.java | 3 +- 30 files changed, 627 insertions(+), 102 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/TransformRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/TransformRequestConverters.java index d7a44db3a5d22..4815353936b28 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/TransformRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/TransformRequestConverters.java @@ -41,6 +41,7 @@ import static org.elasticsearch.client.transform.DeleteTransformRequest.FORCE; import static org.elasticsearch.client.transform.GetTransformRequest.ALLOW_NO_MATCH; import static org.elasticsearch.client.transform.PutTransformRequest.DEFER_VALIDATION; +import static org.elasticsearch.client.transform.StopTransformRequest.WAIT_FOR_CHECKPOINT; final class TransformRequestConverters { @@ -135,6 +136,9 @@ static Request stopTransform(StopTransformRequest stopRequest) { if (stopRequest.getAllowNoMatch() != null) { request.addParameter(ALLOW_NO_MATCH, stopRequest.getAllowNoMatch().toString()); } + if (stopRequest.getWaitForCheckpoint() != null) { + request.addParameter(WAIT_FOR_CHECKPOINT, stopRequest.getWaitForCheckpoint().toString()); + } request.addParameters(params.asMap()); return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/StopTransformRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/StopTransformRequest.java index 33fc356c8da36..33adc233102e5 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/StopTransformRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/StopTransformRequest.java @@ -28,21 +28,23 @@ public class StopTransformRequest implements Validatable { + public static final String WAIT_FOR_CHECKPOINT = "wait_for_checkpoint"; + private final String id; private Boolean waitForCompletion; + private Boolean waitForCheckpoint; private TimeValue timeout; private Boolean allowNoMatch; public StopTransformRequest(String id) { - this.id = id; - waitForCompletion = null; - timeout = null; + this(id, null, null, null); } - public StopTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout) { + public StopTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout, Boolean waitForCheckpoint) { this.id = id; this.waitForCompletion = waitForCompletion; this.timeout = timeout; + this.waitForCheckpoint = waitForCheckpoint; } public String getId() { @@ -73,6 +75,14 @@ public void setAllowNoMatch(Boolean allowNoMatch) { this.allowNoMatch = allowNoMatch; } + public Boolean getWaitForCheckpoint() { + return waitForCheckpoint; + } + + public void setWaitForCheckpoint(Boolean waitForCheckpoint) { + this.waitForCheckpoint = waitForCheckpoint; + } + @Override public Optional validate() { if (id == null) { @@ -86,7 +96,7 @@ public Optional validate() { @Override public int hashCode() { - return Objects.hash(id, waitForCompletion, timeout, allowNoMatch); + return Objects.hash(id, waitForCompletion, timeout, allowNoMatch, waitForCheckpoint); } @Override @@ -102,6 +112,7 @@ public boolean equals(Object obj) { return Objects.equals(this.id, other.id) && Objects.equals(this.waitForCompletion, other.waitForCompletion) && Objects.equals(this.timeout, other.timeout) + && Objects.equals(this.waitForCheckpoint, other.waitForCheckpoint) && Objects.equals(this.allowNoMatch, other.allowNoMatch); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TransformIT.java index 23771c2bff92e..117bfdded6297 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TransformIT.java @@ -147,7 +147,7 @@ private void indexData(String indexName) throws IOException { public void cleanUpTransforms() throws Exception { for (String transformId : transformsToClean) { highLevelClient().transform().stopTransform( - new StopTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT); + new StopTransformRequest(transformId, Boolean.TRUE, null, false), RequestOptions.DEFAULT); } for (String transformId : transformsToClean) { @@ -310,7 +310,7 @@ public void testStartStop() throws IOException { assertThat(taskState, oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING, TransformStats.State.STOPPING, TransformStats.State.STOPPED)); - StopTransformRequest stopRequest = new StopTransformRequest(id, Boolean.TRUE, null); + StopTransformRequest stopRequest = new StopTransformRequest(id, Boolean.TRUE, null, false); StopTransformResponse stopResponse = execute(stopRequest, client::stopTransform, client::stopTransformAsync); assertTrue(stopResponse.isAcknowledged()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/TransformRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/TransformRequestConvertersTests.java index f7c32652fff01..9d25f302b8c41 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/TransformRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/TransformRequestConvertersTests.java @@ -149,7 +149,12 @@ public void testStopDataFrameTransform() { if (randomBoolean()) { timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout"); } - StopTransformRequest stopRequest = new StopTransformRequest(id, waitForCompletion, timeValue); + Boolean waitForCheckpoint = null; + if (randomBoolean()) { + waitForCheckpoint = randomBoolean(); + } + + StopTransformRequest stopRequest = new StopTransformRequest(id, waitForCompletion, timeValue, waitForCheckpoint); Request request = TransformRequestConverters.stopTransform(stopRequest); assertEquals(HttpPost.METHOD_NAME, request.getMethod()); @@ -168,6 +173,12 @@ public void testStopDataFrameTransform() { } else { assertFalse(request.getParameters().containsKey("timeout")); } + if (waitForCheckpoint != null) { + assertTrue(request.getParameters().containsKey("wait_for_checkpoint")); + assertEquals(stopRequest.getWaitForCheckpoint(), Boolean.parseBoolean(request.getParameters().get("wait_for_checkpoint"))); + } else { + assertFalse(request.getParameters().containsKey("wait_for_checkpoint")); + } assertFalse(request.getParameters().containsKey(ALLOW_NO_MATCH)); stopRequest.setAllowNoMatch(randomBoolean()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TransformDocumentationIT.java index e27c36afd317a..5438d5dd554c5 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TransformDocumentationIT.java @@ -81,7 +81,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase { public void cleanUpTransforms() throws Exception { for (String transformId : transformsToClean) { highLevelClient().transform().stopTransform( - new StopTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT); + new StopTransformRequest(transformId, true, TimeValue.timeValueSeconds(20), false), RequestOptions.DEFAULT); } for (String transformId : transformsToClean) { diff --git a/docs/reference/transform/apis/stop-transform.asciidoc b/docs/reference/transform/apis/stop-transform.asciidoc index c367c487b1f37..95f7008352641 100644 --- a/docs/reference/transform/apis/stop-transform.asciidoc +++ b/docs/reference/transform/apis/stop-transform.asciidoc @@ -90,6 +90,10 @@ are no matches or only partial matches. state completely stops. If set to `false`, the API returns immediately and the indexer will be stopped asynchronously in the background. Defaults to `false`. +`wait_for_checkpoint`:: + (Optional, boolean) If set to `true`, the transform will not completely stop + until the current checkpoint is completed. If set to `false`, the transform + stops as soon as possible. Defaults to `false`. [[stop-transform-response-codes]] ==== {api-response-codes-title} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index a82d3cc822dd6..fd81a2f0c63e8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -22,6 +22,7 @@ public final class TransformField { public static final ParseField GROUP_BY = new ParseField("group_by"); public static final ParseField TIMEOUT = new ParseField("timeout"); public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion"); + public static final ParseField WAIT_FOR_CHECKPOINT = new ParseField("wait_for_checkpoint"); public static final ParseField STATS_FIELD = new ParseField("stats"); public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type"); public static final ParseField SOURCE = new ParseField("source"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java index 2bd70e1789a58..3a3d6134172bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java @@ -48,9 +48,15 @@ public static class Request extends BaseTasksRequest { private final boolean waitForCompletion; private final boolean force; private final boolean allowNoMatch; + private final boolean waitForCheckpoint; private Set expandedIds; - public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout, boolean allowNoMatch) { + public Request(String id, + boolean waitForCompletion, + boolean force, + @Nullable TimeValue timeout, + boolean allowNoMatch, + boolean waitForCheckpoint) { this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName()); this.waitForCompletion = waitForCompletion; this.force = force; @@ -58,6 +64,7 @@ public Request(String id, boolean waitForCompletion, boolean force, @Nullable Ti // use the timeout value already present in BaseTasksRequest this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout); this.allowNoMatch = allowNoMatch; + this.waitForCheckpoint = waitForCheckpoint; } public Request(StreamInput in) throws IOException { @@ -73,6 +80,11 @@ public Request(StreamInput in) throws IOException { } else { this.allowNoMatch = true; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.waitForCheckpoint = in.readBoolean(); + } else { + this.waitForCheckpoint = false; + } } public String getId() { @@ -99,6 +111,10 @@ public boolean isAllowNoMatch() { return allowNoMatch; } + public boolean isWaitForCheckpoint() { + return waitForCheckpoint; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -113,6 +129,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_3_0)) { out.writeBoolean(allowNoMatch); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(waitForCheckpoint); + } } @Override @@ -123,7 +142,7 @@ public ActionRequestValidationException validate() { @Override public int hashCode() { // the base class does not implement hashCode, therefore we need to hash timeout ourselves - return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch); + return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch, waitForCheckpoint); } @Override @@ -146,6 +165,7 @@ public boolean equals(Object obj) { Objects.equals(waitForCompletion, other.waitForCompletion) && Objects.equals(force, other.force) && Objects.equals(expandedIds, other.expandedIds) && + Objects.equals(waitForCheckpoint, other.waitForCheckpoint) && allowNoMatch == other.allowNoMatch; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java index 29ae1fe3968c4..12a75621efa83 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java @@ -43,6 +43,9 @@ public class TransformState implements Task.Status, PersistentTaskState { @Nullable private NodeAttributes node; + // TODO: 8.x this needs to be deprecated and we move towards a STOPPING TASK_STATE + private final boolean shouldStopAtNextCheckpoint; + public static final ParseField TASK_STATE = new ParseField("task_state"); public static final ParseField INDEXER_STATE = new ParseField("indexer_state"); @@ -53,28 +56,38 @@ public class TransformState implements Task.Status, PersistentTaskState { public static final ParseField REASON = new ParseField("reason"); public static final ParseField PROGRESS = new ParseField("progress"); public static final ParseField NODE = new ParseField("node"); + public static final ParseField SHOULD_STOP_AT_NEXT_CHECKPOINT = new ParseField("should_stop_at_checkpoint"); + @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - true, - args -> { - TransformTaskState taskState = (TransformTaskState) args[0]; - IndexerState indexerState = (IndexerState) args[1]; - Map bwcCurrentPosition = (Map) args[2]; - TransformIndexerPosition transformIndexerPosition = (TransformIndexerPosition) args[3]; - - // BWC handling, translate current_position to position iff position isn't set - if (bwcCurrentPosition != null && transformIndexerPosition == null) { - transformIndexerPosition = new TransformIndexerPosition(bwcCurrentPosition, null); - } - - long checkpoint = (long) args[4]; - String reason = (String) args[5]; - TransformProgress progress = (TransformProgress) args[6]; - NodeAttributes node = (NodeAttributes) args[7]; - - return new TransformState(taskState, indexerState, transformIndexerPosition, checkpoint, reason, progress, node); - }); + true, + args -> { + TransformTaskState taskState = (TransformTaskState) args[0]; + IndexerState indexerState = (IndexerState) args[1]; + Map bwcCurrentPosition = (Map) args[2]; + TransformIndexerPosition transformIndexerPosition = (TransformIndexerPosition) args[3]; + + // BWC handling, translate current_position to position iff position isn't set + if (bwcCurrentPosition != null && transformIndexerPosition == null) { + transformIndexerPosition = new TransformIndexerPosition(bwcCurrentPosition, null); + } + + long checkpoint = (long) args[4]; + String reason = (String) args[5]; + TransformProgress progress = (TransformProgress) args[6]; + NodeAttributes node = (NodeAttributes) args[7]; + boolean shouldStopAtNextCheckpoint = args[8] == null ? false : (boolean)args[8]; + + return new TransformState(taskState, + indexerState, + transformIndexerPosition, + checkpoint, + reason, + progress, + node, + shouldStopAtNextCheckpoint); + }); static { PARSER.declareField(constructorArg(), p -> TransformTaskState.fromString(p.text()), TASK_STATE, ValueType.STRING); @@ -85,15 +98,17 @@ public class TransformState implements Task.Status, PersistentTaskState { PARSER.declareString(optionalConstructorArg(), REASON); PARSER.declareField(optionalConstructorArg(), TransformProgress.PARSER::apply, PROGRESS, ValueType.OBJECT); PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE, ValueType.OBJECT); + PARSER.declareBoolean(optionalConstructorArg(), SHOULD_STOP_AT_NEXT_CHECKPOINT); } public TransformState(TransformTaskState taskState, - IndexerState indexerState, - @Nullable TransformIndexerPosition position, - long checkpoint, - @Nullable String reason, - @Nullable TransformProgress progress, - @Nullable NodeAttributes node) { + IndexerState indexerState, + @Nullable TransformIndexerPosition position, + long checkpoint, + @Nullable String reason, + @Nullable TransformProgress progress, + @Nullable NodeAttributes node, + boolean shouldStopAtNextCheckpoint) { this.taskState = taskState; this.indexerState = indexerState; this.position = position; @@ -101,14 +116,25 @@ public TransformState(TransformTaskState taskState, this.reason = reason; this.progress = progress; this.node = node; + this.shouldStopAtNextCheckpoint = shouldStopAtNextCheckpoint; + } + + public TransformState(TransformTaskState taskState, + IndexerState indexerState, + @Nullable TransformIndexerPosition position, + long checkpoint, + @Nullable String reason, + @Nullable TransformProgress progress, + @Nullable NodeAttributes node) { + this(taskState, indexerState, position, checkpoint, reason, progress, node, false); } public TransformState(TransformTaskState taskState, - IndexerState indexerState, - @Nullable TransformIndexerPosition position, - long checkpoint, - @Nullable String reason, - @Nullable TransformProgress progress) { + IndexerState indexerState, + @Nullable TransformIndexerPosition position, + long checkpoint, + @Nullable String reason, + @Nullable TransformProgress progress) { this(taskState, indexerState, position, checkpoint, reason, progress, null); } @@ -129,6 +155,11 @@ public TransformState(StreamInput in) throws IOException { } else { node = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + shouldStopAtNextCheckpoint = in.readBoolean(); + } else { + shouldStopAtNextCheckpoint = false; + } } public TransformTaskState getTaskState() { @@ -164,6 +195,10 @@ public TransformState setNode(NodeAttributes node) { return this; } + public boolean shouldStopAtNextCheckpoint() { + return shouldStopAtNextCheckpoint; + } + public static TransformState fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); @@ -190,6 +225,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (node != null) { builder.field(NODE.getPreferredName(), node); } + builder.field(SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName(), shouldStopAtNextCheckpoint); builder.endObject(); return builder; } @@ -214,6 +250,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_3_0)) { out.writeOptionalWriteable(node); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(shouldStopAtNextCheckpoint); + } } @Override @@ -234,12 +273,13 @@ public boolean equals(Object other) { this.checkpoint == that.checkpoint && Objects.equals(this.reason, that.reason) && Objects.equals(this.progress, that.progress) && + Objects.equals(this.shouldStopAtNextCheckpoint, that.shouldStopAtNextCheckpoint) && Objects.equals(this.node, that.node); } @Override public int hashCode() { - return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node); + return Objects.hash(taskState, indexerState, position, checkpoint, reason, progress, node, shouldStopAtNextCheckpoint); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java index e807dafc8b435..0049f4a6272a8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java @@ -14,6 +14,7 @@ import java.util.Locale; public enum TransformTaskState implements Writeable { + // TODO 8.x add a `STOPPING` state and BWC handling in ::fromString STOPPED, STARTED, FAILED; public static TransformTaskState fromString(String name) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java index 40852fdf34e89..d5d75d2d34a03 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/StopTransformActionRequestTests.java @@ -24,7 +24,12 @@ public class StopTransformActionRequestTests extends AbstractWireSerializingTest @Override protected Request createTestInstance() { TimeValue timeout = randomBoolean() ? TimeValue.timeValueMinutes(randomIntBetween(1, 10)) : null; - Request request = new Request(randomAlphaOfLengthBetween(1, 10), randomBoolean(), randomBoolean(), timeout, randomBoolean()); + Request request = new Request(randomAlphaOfLengthBetween(1, 10), + randomBoolean(), + randomBoolean(), + timeout, + randomBoolean(), + randomBoolean()); if (randomBoolean()) { request.setExpandedIds(new HashSet<>(Arrays.asList(generateRandomStringArray(5, 6, false)))); } @@ -41,9 +46,10 @@ public void testSameButDifferentTimeout() { boolean waitForCompletion = randomBoolean(); boolean force = randomBoolean(); boolean allowNoMatch = randomBoolean(); + boolean waitForCheckpoint = randomBoolean(); - Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch); - Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch); + Request r1 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(10), allowNoMatch, waitForCheckpoint); + Request r2 = new Request(id, waitForCompletion, force, TimeValue.timeValueSeconds(20), allowNoMatch, waitForCheckpoint); assertNotEquals(r1,r2); assertNotEquals(r1.hashCode(),r2.hashCode()); @@ -53,20 +59,20 @@ public void testMatch() { String dataFrameId = "dataframe-id"; Task dataFrameTask = new Task(1L, "persistent", "action", - TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId, - TaskId.EMPTY_TASK_ID, Collections.emptyMap()); + TransformField.PERSISTENT_TASK_DESCRIPTION_PREFIX + dataFrameId, + TaskId.EMPTY_TASK_ID, Collections.emptyMap()); - Request request = new Request("unrelated", false, false, null, false); + Request request = new Request("unrelated", false, false, null, false, false); request.setExpandedIds(Set.of("foo", "bar")); assertFalse(request.match(dataFrameTask)); - Request matchingRequest = new Request(dataFrameId, false, false, null, false); + Request matchingRequest = new Request(dataFrameId, false, false, null, false, false); matchingRequest.setExpandedIds(Set.of(dataFrameId)); assertTrue(matchingRequest.match(dataFrameTask)); Task notADataFrameTask = new Task(1L, "persistent", "action", - "some other task, say monitoring", - TaskId.EMPTY_TASK_ID, Collections.emptyMap()); + "some other task, say monitoring", + TaskId.EMPTY_TASK_ID, Collections.emptyMap()); assertFalse(matchingRequest.match(notADataFrameTask)); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java index 3bd90944894a5..a8435e649c1f7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStateTests.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.core.transform.transforms; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -26,7 +29,8 @@ public static TransformState randomDataFrameTransformState() { randomLongBetween(0,10), randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : randomTransformProgress(), - randomBoolean() ? null : randomNodeAttributes()); + randomBoolean() ? null : randomNodeAttributes(), + randomBoolean()); } @Override @@ -53,4 +57,24 @@ protected boolean supportsUnknownFields() { protected Predicate getRandomFieldsExcludeFilter() { return field -> !field.isEmpty(); } + + public void testBackwardsSerialization() throws IOException { + TransformState state = new TransformState(randomFrom(TransformTaskState.values()), + randomFrom(IndexerState.values()), + TransformIndexerPositionTests.randomTransformIndexerPosition(), + randomLongBetween(0,10), + randomBoolean() ? null : randomAlphaOfLength(10), + randomBoolean() ? null : randomTransformProgress(), + randomBoolean() ? null : randomNodeAttributes(), + false); // Will be false after BWC deserialization + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_7_5_0); + state.writeTo(output); + try (StreamInput in = output.bytes().streamInput()) { + in.setVersion(Version.V_7_5_0); + TransformState streamedState = new TransformState(in); + assertEquals(state, streamedState); + } + } + } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/transform.stop_transform.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/transform.stop_transform.json index 3ad86f6245f02..07867e14b09ad 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/transform.stop_transform.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/transform.stop_transform.json @@ -20,21 +20,26 @@ } ] }, - "params":{ - "wait_for_completion":{ - "type":"boolean", - "required":false, - "description":"Whether to wait for the transform to fully stop before returning or not. Default to false" + "params": { + "wait_for_completion": { + "type": "boolean", + "required": false, + "description": "Whether to wait for the transform to fully stop before returning or not. Default to false" }, - "timeout":{ - "type":"time", - "required":false, - "description":"Controls the time to wait until the transform has stopped. Default to 30 seconds" + "timeout": { + "type": "time", + "required": false, + "description": "Controls the time to wait until the transform has stopped. Default to 30 seconds" }, - "allow_no_match":{ - "type":"boolean", - "required":false, - "description":"Whether to ignore if a wildcard expression matches no transforms. (This includes `_all` string or when no transforms have been specified)" + "allow_no_match": { + "type": "boolean", + "required": false, + "description": "Whether to ignore if a wildcard expression matches no transforms. (This includes `_all` string or when no transforms have been specified)" + }, + "wait_for_checkpoint": { + "type": "boolean", + "required": false, + "description": "Whether to wait for the transform to reach a checkpoint before stopping. Default to false" } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml index 264952c5d0251..7d5f83ee4fd16 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml @@ -51,6 +51,7 @@ setup: teardown: - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" timeout: "10m" wait_for_completion: true @@ -59,6 +60,7 @@ teardown: transform_id: "airline-transform-start-stop" - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop-continuous" timeout: "10m" wait_for_completion: true @@ -131,6 +133,7 @@ teardown: - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" wait_for_completion: true - match: { acknowledged: true } @@ -168,6 +171,7 @@ teardown: - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop-continuous" wait_for_completion: true - match: { acknowledged: true } @@ -193,6 +197,7 @@ teardown: - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop-continuous" wait_for_completion: true - match: { acknowledged: true } @@ -201,18 +206,21 @@ teardown: - do: catch: missing transform.stop_transform: + wait_for_checkpoint: false transform_id: "missing-transform" --- "Test stop missing transform by expression": - do: transform.stop_transform: + wait_for_checkpoint: false allow_no_match: true transform_id: "missing-transform*" - do: catch: missing transform.stop_transform: + wait_for_checkpoint: false allow_no_match: false transform_id: "missing-transform*" @@ -220,6 +228,7 @@ teardown: "Test stop already stopped transform": - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop" - match: { acknowledged: true } @@ -269,6 +278,7 @@ teardown: - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-stop-continuous" wait_for_completion: true - match: { acknowledged: true } @@ -283,6 +293,7 @@ teardown: - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "airline-transform-start-later" wait_for_completion: true - match: { acknowledged: true } @@ -317,6 +328,7 @@ teardown: - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "_all" wait_for_completion: true - match: { acknowledged: true } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_stats.yml index 5f4c11f00d088..fe38576309186 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_stats.yml @@ -33,6 +33,7 @@ setup: teardown: - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "airline-transform-stats" wait_for_completion: true @@ -252,6 +253,7 @@ teardown: - do: transform.stop_transform: + wait_for_checkpoint: false transform_id: "airline-transform-stats-continuous" wait_for_completion: true diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index 46c53b8204b2d..9e3c8ff0019d2 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -235,6 +235,49 @@ public void testContinuousTransformUpdate() throws Exception { deleteTransform(config.getId()); } + public void testStopWaitForCheckpoint() throws Exception { + String indexName = "wait-for-checkpoint-reviews"; + String transformId = "data-frame-transform-wait-for-checkpoint"; + createReviewsIndex(indexName, 1000); + + Map groups = new HashMap<>(); + groups.put("by-day", createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null)); + groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); + groups.put("by-business", TermsGroupSource.builder().setField("business_id").build()); + + AggregatorFactories.Builder aggs = AggregatorFactories.builder() + .addAggregator(AggregationBuilders.avg("review_score").field("stars")) + .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); + + TransformConfig config = createTransformConfigBuilder(transformId, + groups, + aggs, + "reviews-by-user-business-day", + QueryBuilders.matchAllQuery(), + indexName) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) + .build(); + + assertTrue(putTransform(config, RequestOptions.DEFAULT).isAcknowledged()); + assertTrue(startTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + + // waitForCheckpoint: true should make the transform continue until we hit the first checkpoint, then it will stop + stopTransform(transformId, false, null, true); + + // Wait until the first checkpoint + waitUntilCheckpoint(config.getId(), 1L); + + // Even though we are continuous, we should be stopped now as we needed to stop at the first checkpoint + assertBusy(() -> { + TransformStats stateAndStats = getTransformStats(config.getId()).getTransformsStats().get(0); + assertThat(stateAndStats.getState(), equalTo(TransformStats.State.STOPPED)); + assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(1000L)); + }); + + stopTransform(config.getId()); + deleteTransform(config.getId()); + } + private void indexMoreDocs(long timestamp, long userId, String index) throws Exception { BulkRequest bulk = new BulkRequest(index); for (int i = 0; i < 25; i++) { diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java index 0322ec8d9873e..53da3ee03f1aa 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformIntegTestCase.java @@ -86,8 +86,16 @@ protected void cleanUpTransforms() throws IOException { } protected StopTransformResponse stopTransform(String id) throws IOException { + return stopTransform(id, true, null, false); + } + + protected StopTransformResponse stopTransform(String id, + boolean waitForCompletion, + TimeValue timeout, + boolean waitForCheckpoint) throws IOException { RestHighLevelClient restClient = new TestRestHighLevelClient(); - return restClient.transform().stopTransform(new StopTransformRequest(id, true, null), RequestOptions.DEFAULT); + return restClient.transform() + .stopTransform(new StopTransformRequest(id, waitForCompletion, timeout, waitForCheckpoint), RequestOptions.DEFAULT); } protected StartTransformResponse startTransform(String id, RequestOptions options) throws IOException { @@ -298,7 +306,7 @@ protected void createReviewsIndex(String indexName, int numDocs) throws Exceptio .append("\"}"); bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON)); - if (i % 50 == 0) { + if (i % 100 == 0) { BulkResponse response = restClient.bulk(bulk, RequestOptions.DEFAULT); assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); bulk = new BulkRequest(indexName); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index c47a191b19ff6..16a559d5b5c68 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform.integration; import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.junit.Before; @@ -863,6 +864,61 @@ public void testManyBucketsWithSmallPageSize() throws Exception { assertEquals(101, ((List)XContentMapValues.extractValue("transforms.stats.pages_processed", stats)).get(0)); } + public void testContinuousStopWaitForCheckpoint() throws Exception { + Request updateLoggingLevels = new Request("PUT", "/_cluster/settings"); + updateLoggingLevels.setJsonEntity( + "{\"transient\": {" + + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + + "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}"); + client().performRequest(updateLoggingLevels); + String indexName = "continuous_reviews_wait_for_checkpoint"; + createReviewsIndex(indexName); + String transformId = "simple_continuous_pivot_wait_for_checkpoint"; + String dataFrameIndex = "pivot_reviews_continuous_wait_for_checkpoint"; + setupDataAccessRole(DATA_ACCESS_ROLE, indexName, dataFrameIndex); + final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS); + String config = "{" + + " \"source\": {\"index\":\"" + indexName + "\"}," + + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + + " \"frequency\": \"1s\"," + + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}}," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } } } }" + + "}"; + createDataframeTransformRequest.setJsonEntity(config); + Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); + assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + startAndWaitForContinuousTransform(transformId, dataFrameIndex, null); + assertTrue(indexExists(dataFrameIndex)); + assertBusy(() -> { + try { + stopTransform(transformId,false, true); + } catch (ResponseException e) { + // We get a conflict sometimes depending on WHEN we try to write the state, should eventually pass though + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(200)); + } + }); + + // get and check some users + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_0", 3.776978417); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_5", 3.72); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_11", 3.846153846); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_20", 3.769230769); + assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918); + deleteIndex(indexName); + } + private void assertOnePivotValue(String query, double expected) throws IOException { Map searchResult = getAsMap(query); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index c98ea5c57b7f9..bf10e3976e983 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -260,10 +260,14 @@ protected void startDataframeTransform(String transformId, String authHeader, St } protected void stopTransform(String transformId, boolean force) throws Exception { - // start the transform + stopTransform(transformId, force, false); + } + + protected void stopTransform(String transformId, boolean force, boolean waitForCheckpoint) throws Exception { final Request stopTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_stop", null); stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(force)); stopTransformRequest.addParameter(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(true)); + stopTransformRequest.addParameter(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), Boolean.toString(waitForCheckpoint)); Map stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest)); assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java index c9d97213f5402..a2a5d3bfd1c87 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java @@ -92,7 +92,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, StopTransformAction.INSTANCE, - new StopTransformAction.Request(request.getId(), true, true, null, true), + new StopTransformAction.Request(request.getId(), true, true, null, true, false), ActionListener.wrap( r -> stopTransformActionListener.onResponse(null), stopTransformActionListener::onFailure)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index 088e8026153a6..a428adb446e64 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -46,6 +46,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.xpack.core.transform.TransformMessages.CANNOT_STOP_FAILED_TRANSFORM; @@ -151,13 +153,24 @@ protected void taskOperation(Request request, TransformTask transformTask, Actio } if (ids.contains(transformTask.getTransformId())) { - try { - transformTask.stop(request.isForce()); - } catch (ElasticsearchException ex) { - listener.onFailure(ex); - return; - } - listener.onResponse(new Response(Boolean.TRUE)); + transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap( + r -> { + try { + transformTask.stop(request.isForce(), request.isWaitForCheckpoint()); + listener.onResponse(new Response(true)); + } catch (ElasticsearchException ex) { + listener.onFailure(ex); + } + }, + e -> listener.onFailure( + new ElasticsearchStatusException( + "Failed to update transform task [{}] state value should_stop_at_checkpoint from [{}] to [{}]", + RestStatus.CONFLICT, + transformTask.getTransformId(), + transformTask.getState().shouldStopAtNextCheckpoint(), + request.isWaitForCheckpoint())) + ) + ); } else { listener.onFailure(new RuntimeException("ID of transform task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); @@ -196,6 +209,13 @@ private ActionListener waitForStopListener(Request request, ActionList ); return ActionListener.wrap( response -> { + // If there were failures attempting to stop the tasks, we don't know if they will actually stop. + // It is better to respond to the user now than allow for the persistent task waiting to timeout + if (response.getTaskFailures().isEmpty() == false || response.getNodeFailures().isEmpty() == false) { + RestStatus status = firstNotOKStatus(response.getTaskFailures(), response.getNodeFailures()); + listener.onFailure(buildException(response.getTaskFailures(), response.getNodeFailures(), status)); + return; + } // Wait until the persistent task is stopped // Switch over to Generic threadpool so we don't block the network thread threadPool.generic().execute(() -> @@ -205,6 +225,46 @@ private ActionListener waitForStopListener(Request request, ActionList ); } + static ElasticsearchStatusException buildException(List taskOperationFailures, + List elasticsearchExceptions, + RestStatus status) { + List exceptions = Stream.concat( + taskOperationFailures.stream().map(TaskOperationFailure::getCause), + elasticsearchExceptions.stream()).collect(Collectors.toList()); + + ElasticsearchStatusException elasticsearchStatusException = + new ElasticsearchStatusException(exceptions.get(0).getMessage(), status); + + for (int i = 1; i < exceptions.size(); i++) { + elasticsearchStatusException.addSuppressed(exceptions.get(i)); + } + return elasticsearchStatusException; + } + + static RestStatus firstNotOKStatus(List taskOperationFailures, List exceptions) { + RestStatus status = RestStatus.OK; + + for (TaskOperationFailure taskOperationFailure : taskOperationFailures) { + status = taskOperationFailure.getStatus(); + if (RestStatus.OK.equals(status) == false) { + break; + } + } + if (status == RestStatus.OK) { + for (ElasticsearchException exception : exceptions) { + // As it stands right now, this will ALWAYS be INTERNAL_SERVER_ERROR. + // FailedNodeException does not overwrite the `status()` method and the logic in ElasticsearchException + // Just returns an INTERNAL_SERVER_ERROR + status = exception.status(); + if (RestStatus.OK.equals(status) == false) { + break; + } + } + } + // If all the previous exceptions don't have a valid status, we have an unknown error. + return status == RestStatus.OK ? RestStatus.INTERNAL_SERVER_ERROR : status; + } + private void waitForTransformStopped(Set persistentTaskIds, TimeValue timeout, boolean force, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java index 934c2508de3f2..274ddfaaf7a6a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java @@ -54,6 +54,7 @@ public final class TransformInternalIndex { * progress::docs_processed, progress::docs_indexed, * stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed, * stats::exponential_avg_documents_processed + * version 3 (7.6): state::should_stop_at_checkpoint */ // constants for mappings @@ -71,6 +72,7 @@ public final class TransformInternalIndex { public static final String DOUBLE = "double"; public static final String LONG = "long"; public static final String KEYWORD = "keyword"; + public static final String BOOLEAN = "boolean"; public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException { IndexTemplateMetaData transformTemplate = IndexTemplateMetaData.builder(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME) @@ -175,6 +177,9 @@ private static XContentBuilder addTransformStoredDocMappings(XContentBuilder bui .startObject(TransformState.INDEXER_STATE.getPreferredName()) .field(TYPE, KEYWORD) .endObject() + .startObject(TransformState.SHOULD_STOP_AT_NEXT_CHECKPOINT.getPreferredName()) + .field(TYPE, BOOLEAN) + .endObject() .startObject(TransformState.CURRENT_POSITION.getPreferredName()) .field(ENABLED, false) .endObject() diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStopTransformAction.java index b038ea79d991f..85d138bfc37fd 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStopTransformAction.java @@ -28,13 +28,15 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient boolean waitForCompletion = restRequest.paramAsBoolean(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), false); boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false); boolean allowNoMatch = restRequest.paramAsBoolean(TransformField.ALLOW_NO_MATCH.getPreferredName(), false); + boolean waitForCheckpoint = restRequest.paramAsBoolean(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), false); StopTransformAction.Request request = new StopTransformAction.Request(id, waitForCompletion, force, timeout, - allowNoMatch); + allowNoMatch, + waitForCheckpoint); return channel -> client.execute(StopTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/compat/RestStopTransformActionDeprecated.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/compat/RestStopTransformActionDeprecated.java index 29f59397b8a3d..2a43df2a7f94d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/compat/RestStopTransformActionDeprecated.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/compat/RestStopTransformActionDeprecated.java @@ -36,13 +36,15 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient boolean waitForCompletion = restRequest.paramAsBoolean(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), false); boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false); boolean allowNoMatch = restRequest.paramAsBoolean(TransformField.ALLOW_NO_MATCH.getPreferredName(), false); + boolean waitForCheckpoint = restRequest.paramAsBoolean(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), false); StopTransformAction.Request request = new StopTransformAction.Request(id, waitForCompletion, force, timeout, - allowNoMatch); + allowNoMatch, + waitForCheckpoint); return channel -> client.execute(StopTransformActionDeprecated.INSTANCE, request, new RestToXContentListener<>(channel)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 76f654678baa9..c59cba74cec65 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -64,6 +64,8 @@ class ClientTransformIndexer extends TransformIndexer { // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index private volatile String lastAuditedExceptionMessage = null; private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); + // TODO should be obviated in 8.x with TransformTaskState::STOPPING + private volatile boolean shouldStopAtCheckpoint = false; private volatile Instant changesLastDetectedAt; ClientTransformIndexer(TransformConfigManager transformsConfigManager, @@ -78,7 +80,8 @@ class ClientTransformIndexer extends TransformIndexer { TransformProgress transformProgress, TransformCheckpoint lastCheckpoint, TransformCheckpoint nextCheckpoint, - TransformTask parentTask) { + TransformTask parentTask, + boolean shouldStopAtCheckpoint) { super(ExceptionsHelper.requireNonNull(parentTask, "parentTask") .getThreadPool() .executor(ThreadPool.Names.GENERIC), @@ -97,6 +100,15 @@ class ClientTransformIndexer extends TransformIndexer { this.client = ExceptionsHelper.requireNonNull(client, "client"); this.transformTask = parentTask; this.failureCount = new AtomicInteger(0); + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + } + + boolean shouldStopAtCheckpoint() { + return shouldStopAtCheckpoint; + } + + void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; } @Override @@ -297,6 +309,21 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p return; } + boolean shouldStopAtCheckpoint = shouldStopAtCheckpoint(); + + // If we should stop at the next checkpoint, are STARTED, and with `initialRun()` we are in one of two states + // 1. We have just called `onFinish` completing our request, but `shouldStopAtCheckpoint` was set to `true` before our check + // there and now + // 2. We are on the very first run of a NEW checkpoint and got here either through a failure, or the very first save state call. + // + // In either case, we should stop so that we guarantee a consistent state and that there are no partially completed checkpoints + if (shouldStopAtCheckpoint && initialRun() && indexerState.equals(IndexerState.STARTED)) { + indexerState = IndexerState.STOPPED; + auditor.info(transformConfig.getId(), "Transform is no longer in the middle of a checkpoint, initiating stop."); + logger.info("[{}] transform is no longer in the middle of a checkpoint, initiating stop.", + transformConfig.getId()); + } + // This means that the indexer was triggered to discover changes, found none, and exited early. // If the state is `STOPPED` this means that TransformTask#stop was called while we were checking for changes. // Allow the stop call path to continue @@ -321,6 +348,12 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p // OR we called `doSaveState` manually as the indexer was not actively running. // Since we save the state to an index, we should make sure that our task state is in parity with the indexer state if (indexerState.equals(IndexerState.STOPPED)) { + // If we are going to stop after the state is saved, we should NOT persist `shouldStopAtCheckpoint: true` as this may + // cause problems if the task starts up again. + // Additionally, we don't have to worry about inconsistency with the ClusterState (if it is persisted there) as the + // when we stop, we mark the task as complete and that state goes away. + shouldStopAtCheckpoint = false; + // We don't want adjust the stored taskState because as soon as it is `STOPPED` a user could call // .start again. taskState = TransformTaskState.STOPPED; @@ -332,9 +365,19 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p position, transformTask.getCheckpoint(), transformTask.getStateReason(), - getProgress()); + getProgress(), + null, + shouldStopAtCheckpoint); logger.debug("[{}] updating persistent state of transform to [{}].", transformConfig.getId(), state.toString()); + doSaveState(state, ActionListener.wrap( + r -> next.run(), + e -> next.run() + )); + } + + protected void doSaveState(TransformState state, ActionListener listener) { + // This could be `null` but the putOrUpdateTransformStoredDoc handles that case just fine SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = transformTask.getSeqNoPrimaryTermAndIndex(); @@ -356,7 +399,7 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p transformsConfigManager.deleteOldTransformStoredDocuments(getJobId(), ActionListener.wrap( nil -> { logger.trace("[{}] deleted old transform stats and state document", getJobId()); - next.run(); + listener.onResponse(null); }, e -> { String msg = LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", @@ -364,11 +407,11 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p logger.warn(msg, e); // If we have failed, we should attempt the clean up again later oldStatsCleanedUp.set(false); - next.run(); + listener.onResponse(null); } )); } else { - next.run(); + listener.onResponse(null); } }, statsExc -> { @@ -381,7 +424,7 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p if (state.getTaskState().equals(TransformTaskState.STOPPED)) { transformTask.shutdown(); } - next.run(); + listener.onFailure(statsExc); } )); } @@ -404,6 +447,10 @@ protected void onFinish(ActionListener listener) { // This indicates an early exit since no changes were found. // So, don't treat this like a checkpoint being completed, as no work was done. if (hasSourceChanged == false) { + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + if (shouldStopAtCheckpoint) { + stop(); + } listener.onResponse(null); return; } @@ -447,6 +494,10 @@ protected void onFinish(ActionListener listener) { logger.debug( "[{}] finished indexing for transform checkpoint [{}].", getJobId(), checkpoint); auditBulkFailures = true; + // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING + if (shouldStopAtCheckpoint) { + stop(); + } listener.onResponse(null); } catch (Exception e) { listener.onFailure(e); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index f60bada209f34..ca3c54c44cfa6 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -34,6 +34,7 @@ class ClientTransformIndexerBuilder { private TransformProgress progress; private TransformCheckpoint lastCheckpoint; private TransformCheckpoint nextCheckpoint; + private boolean shouldStopAtCheckpoint; ClientTransformIndexerBuilder() { this.initialStats = new TransformIndexerStats(); @@ -54,7 +55,13 @@ ClientTransformIndexer build(TransformTask parentTask) { this.progress, this.lastCheckpoint, this.nextCheckpoint, - parentTask); + parentTask, + this.shouldStopAtCheckpoint); + } + + ClientTransformIndexerBuilder setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + return this; } ClientTransformIndexerBuilder setClient(Client client) { @@ -120,4 +127,4 @@ ClientTransformIndexerBuilder setNextCheckpoint(TransformCheckpoint nextCheckpoi this.nextCheckpoint = nextCheckpoint; return this; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 64b299182d263..1038b2ae1f615 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -205,16 +205,19 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa // Since we have not set the value for this yet, it SHOULD be null buildTask.updateSeqNoPrimaryTermAndIndex(null, seqNoPrimaryTermAndIndex); logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString()); + TransformState transformState = stateAndStats.getTransformState(); indexerBuilder.setInitialStats(stateAndStats.getTransformStats()) .setInitialPosition(stateAndStats.getTransformState().getPosition()) .setProgress(stateAndStats.getTransformState().getProgress()) - .setIndexerState(currentIndexerState(stateAndStats.getTransformState())); + .setIndexerState(currentIndexerState(transformState)) + // TODO should be obviated in 8.x with TransformTaskState::STOPPING + .setShouldStopAtCheckpoint(transformState.shouldStopAtNextCheckpoint()); logger.debug("[{}] Loading existing state: [{}], position [{}]", transformId, stateAndStats.getTransformState(), stateAndStats.getTransformState().getPosition()); - stateHolder.set(stateAndStats.getTransformState()); + stateHolder.set(transformState); final long lastCheckpoint = stateHolder.get().getCheckpoint(); if (lastCheckpoint == 0) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 75054e949bf69..ca7ed16afcd7a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -141,7 +141,9 @@ public TransformState getState() { initialPosition, currentCheckpoint.get(), stateReason.get(), - null); + null, + null, + false); } else { return new TransformState( taskState.get(), @@ -149,7 +151,9 @@ public TransformState getState() { indexer.get().getPosition(), currentCheckpoint.get(), stateReason.get(), - getIndexer().getProgress()); + getIndexer().getProgress(), + null, + getIndexer().shouldStopAtCheckpoint()); } } @@ -247,7 +251,9 @@ synchronized void start(Long startingCheckpoint, ActionListener shouldStopAtCheckpointListener) { + logger.debug("[{}] attempted to set task to stop at checkpoint [{}] with state [{}]", + getTransformId(), + shouldStopAtCheckpoint, + getState()); + if (taskState.get() == TransformTaskState.STARTED == false || + getIndexer() == null || + getIndexer().shouldStopAtCheckpoint() == shouldStopAtCheckpoint || + getIndexer().getState() == IndexerState.STOPPED || + getIndexer().getState() == IndexerState.STOPPING) { + shouldStopAtCheckpointListener.onResponse(null); + return; + } + TransformState state = new TransformState( + taskState.get(), + indexer.get().getState(), + indexer.get().getPosition(), + currentCheckpoint.get(), + stateReason.get(), + getIndexer().getProgress(), + null, //Node attributes + shouldStopAtCheckpoint); + getIndexer().doSaveState(state, + ActionListener.wrap( + r -> { + // We only want to update this internal value if it is persisted as such + getIndexer().setShouldStopAtCheckpoint(shouldStopAtCheckpoint); + logger.debug("[{}] successfully persisted should_stop_at_checkpoint update [{}]", + getTransformId(), + shouldStopAtCheckpoint); + shouldStopAtCheckpointListener.onResponse(null); + }, + statsExc -> { + logger.warn("[{}] failed to persist should_stop_at_checkpoint update [{}]", + getTransformId(), + shouldStopAtCheckpoint); + shouldStopAtCheckpointListener.onFailure(statsExc); + } + )); + } + + public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { + logger.debug("[{}] stop called with force [{}], shouldStopAtCheckpoint [{}], state [{}]", + getTransformId(), + force, + shouldStopAtCheckpoint, + getState()); if (getIndexer() == null) { // If there is no indexer the task has not been triggered // but it still needs to be stopped and removed @@ -296,16 +357,23 @@ public synchronized void stop(boolean force) { RestStatus.CONFLICT); } - IndexerState state = getIndexer().stop(); stateReason.set(null); // No reason to keep it in the potentially failed state. - // Since we have called `stop` against the indexer, we have no more fear of triggering again. - // But, since `doSaveState` is asynchronous, it is best to set the state as STARTED so that another `start` call cannot be - // executed while we are wrapping up. - taskState.compareAndSet(TransformTaskState.FAILED, TransformTaskState.STARTED); - if (state == IndexerState.STOPPED) { - getIndexer().onStop(); - getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); + boolean wasFailed = taskState.compareAndSet(TransformTaskState.FAILED, TransformTaskState.STARTED); + // shouldStopAtCheckpoint only comes into play when onFinish is called (or doSaveState right after). + // if it is false, stop immediately + if (shouldStopAtCheckpoint == false || + // If state was in a failed state, we should stop immediately as we will never reach the next checkpoint + wasFailed || + // If the indexerState is STARTED and it is on an initialRun, that means that the indexer has previously finished a checkpoint, + // or has yet to even start one. + // Either way, this means that we won't get to have onFinish called down stream (or at least won't for some time). + (getIndexer().getState() == IndexerState.STARTED && getIndexer().initialRun())) { + IndexerState state = getIndexer().stop(); + if (state == IndexerState.STOPPED) { + getIndexer().onStop(); + getIndexer().doSaveState(state, getIndexer().getPosition(), () -> {}); + } } } @@ -400,6 +468,12 @@ synchronized void markAsFailed(String reason, ActionListener listener) { // We should not keep retrying. Either the task will be stopped, or started // If it is started again, it is registered again. deregisterSchedulerJob(); + // The idea of stopping at the next checkpoint is no longer valid. Since a failed task could potentially START again, + // we should set this flag to false. + if (getIndexer() != null) { + getIndexer().setShouldStopAtCheckpoint(false); + } + // The end user should see that the task is in a failed state, and attempt to stop it again but with force=true taskState.set(TransformTaskState.FAILED); stateReason.set(reason); TransformState newState = getState(); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java index 9fcc44d7389e5..0ca86c3657f62 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java @@ -5,12 +5,15 @@ */ package org.elasticsearch.xpack.transform.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; +import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformMessages; @@ -18,8 +21,10 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.elasticsearch.rest.RestStatus.CONFLICT; import static org.hamcrest.Matchers.equalTo; @@ -91,4 +96,67 @@ public void testTaskStateValidationWithTransformTasks() { "task has failed"))); } + public void testFirstNotOKStatus() { + List nodeFailures = new ArrayList<>(); + List taskOperationFailures = new ArrayList<>(); + + nodeFailures.add(new ElasticsearchException("nodefailure", + new ElasticsearchStatusException("failure", RestStatus.UNPROCESSABLE_ENTITY))); + taskOperationFailures.add(new TaskOperationFailure("node", + 1, + new ElasticsearchStatusException("failure", RestStatus.BAD_REQUEST))); + + assertThat(TransportStopTransformAction.firstNotOKStatus(Collections.emptyList(), Collections.emptyList()), + equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + + assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, Collections.emptyList()), + equalTo(RestStatus.BAD_REQUEST)); + assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, nodeFailures), + equalTo(RestStatus.BAD_REQUEST)); + assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, + Collections.singletonList(new ElasticsearchException(new ElasticsearchStatusException("not failure", RestStatus.OK)))), + equalTo(RestStatus.BAD_REQUEST)); + + assertThat(TransportStopTransformAction.firstNotOKStatus( + Collections.singletonList(new TaskOperationFailure( + "node", + 1, + new ElasticsearchStatusException("not failure", RestStatus.OK))), + nodeFailures), + equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + + assertThat(TransportStopTransformAction.firstNotOKStatus( + Collections.emptyList(), + nodeFailures), + equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + } + + public void testBuildException() { + List nodeFailures = new ArrayList<>(); + List taskOperationFailures = new ArrayList<>(); + + nodeFailures.add(new ElasticsearchException("node failure")); + taskOperationFailures.add(new TaskOperationFailure("node", + 1, + new ElasticsearchStatusException("task failure", RestStatus.BAD_REQUEST))); + + RestStatus status = CONFLICT; + ElasticsearchStatusException statusException = + TransportStopTransformAction.buildException(taskOperationFailures, nodeFailures, status); + + assertThat(statusException.status(), equalTo(status)); + assertThat(statusException.getMessage(), equalTo(taskOperationFailures.get(0).getCause().getMessage())); + assertThat(statusException.getSuppressed().length, equalTo(1)); + + statusException = TransportStopTransformAction.buildException(Collections.emptyList(), nodeFailures, status); + assertThat(statusException.status(), equalTo(status)); + assertThat(statusException.getMessage(), equalTo(nodeFailures.get(0).getMessage())); + assertThat(statusException.getSuppressed().length, equalTo(0)); + + statusException = TransportStopTransformAction.buildException(taskOperationFailures, Collections.emptyList(), status); + assertThat(statusException.status(), equalTo(status)); + assertThat(statusException.getMessage(), equalTo(taskOperationFailures.get(0).getCause().getMessage())); + assertThat(statusException.getSuppressed().length, equalTo(0)); + } + } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index 99a454c5f1a31..b7589b3bcac5e 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -67,7 +67,8 @@ public void testAudiOnFinishFrequency() { 2L, Collections.emptyMap(), Instant.now().toEpochMilli()), - parentTask); + parentTask, + false); List shouldAudit = IntStream.range(0, 100_000).boxed().map(indexer::shouldAuditOnFinish).collect(Collectors.toList()); From 346e8abfbfefda2c6e6350aa0102113e02bfe251 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 16 Oct 2019 14:16:06 -0400 Subject: [PATCH 2/5] addressing PR comments --- .../TransformInternalIndexConstants.java | 2 +- .../persistence/TransformInternalIndex.java | 3 +- .../transforms/ClientTransformIndexer.java | 37 ++++++++++++++++++- .../transform/transforms/TransformTask.java | 33 +---------------- 4 files changed, 41 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java index cb1d41d5ea21c..574499397e9fd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java @@ -24,7 +24,7 @@ public final class TransformInternalIndexConstants { // internal index // version is not a rollover pattern, however padded because sort is string based - public static final String INDEX_VERSION = "003"; + public static final String INDEX_VERSION = "004"; public static final String INDEX_PATTERN = ".transform-internal-"; public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION; public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java index 274ddfaaf7a6a..bd436bde9aa2d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java @@ -54,7 +54,8 @@ public final class TransformInternalIndex { * progress::docs_processed, progress::docs_indexed, * stats::exponential_avg_checkpoint_duration_ms, stats::exponential_avg_documents_indexed, * stats::exponential_avg_documents_processed - * version 3 (7.6): state::should_stop_at_checkpoint + * + * version 4 (7.6): state::should_stop_at_checkpoint */ // constants for mappings diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index c59cba74cec65..ec37621c3ef52 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -111,6 +111,41 @@ void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint) { this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; } + void persistShouldStopAtCheckpoint(boolean shouldStopAtCheckpoint, ActionListener shouldStopAtCheckpointListener) { + if (this.shouldStopAtCheckpoint == shouldStopAtCheckpoint || + getState() == IndexerState.STOPPED || + getState() == IndexerState.STOPPING) { + shouldStopAtCheckpointListener.onResponse(null); + return; + } + TransformState state = new TransformState( + transformTask.getTaskState(), + getState(), + getPosition(), + transformTask.getCheckpoint(), + transformTask.getStateReason(), + getProgress(), + null, //Node attributes + shouldStopAtCheckpoint); + doSaveState(state, + ActionListener.wrap( + r -> { + // We only want to update this internal value if it is persisted as such + this.shouldStopAtCheckpoint = shouldStopAtCheckpoint; + logger.debug("[{}] successfully persisted should_stop_at_checkpoint update [{}]", + getJobId(), + shouldStopAtCheckpoint); + shouldStopAtCheckpointListener.onResponse(null); + }, + statsExc -> { + logger.warn("[{}] failed to persist should_stop_at_checkpoint update [{}]", + getJobId(), + shouldStopAtCheckpoint); + shouldStopAtCheckpointListener.onFailure(statsExc); + } + )); + } + @Override protected void onStart(long now, ActionListener listener) { if (transformTask.getTaskState() == TransformTaskState.FAILED) { @@ -376,7 +411,7 @@ protected void doSaveState(IndexerState indexerState, TransformIndexerPosition p )); } - protected void doSaveState(TransformState state, ActionListener listener) { + private void doSaveState(TransformState state, ActionListener listener) { // This could be `null` but the putOrUpdateTransformStoredDoc handles that case just fine SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex = transformTask.getSeqNoPrimaryTermAndIndex(); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index ca7ed16afcd7a..9df4fce257744 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -296,40 +296,11 @@ public synchronized void setShouldStopAtCheckpoint(boolean shouldStopAtCheckpoin getTransformId(), shouldStopAtCheckpoint, getState()); - if (taskState.get() == TransformTaskState.STARTED == false || - getIndexer() == null || - getIndexer().shouldStopAtCheckpoint() == shouldStopAtCheckpoint || - getIndexer().getState() == IndexerState.STOPPED || - getIndexer().getState() == IndexerState.STOPPING) { + if (taskState.get() != TransformTaskState.STARTED || getIndexer() == null) { shouldStopAtCheckpointListener.onResponse(null); return; } - TransformState state = new TransformState( - taskState.get(), - indexer.get().getState(), - indexer.get().getPosition(), - currentCheckpoint.get(), - stateReason.get(), - getIndexer().getProgress(), - null, //Node attributes - shouldStopAtCheckpoint); - getIndexer().doSaveState(state, - ActionListener.wrap( - r -> { - // We only want to update this internal value if it is persisted as such - getIndexer().setShouldStopAtCheckpoint(shouldStopAtCheckpoint); - logger.debug("[{}] successfully persisted should_stop_at_checkpoint update [{}]", - getTransformId(), - shouldStopAtCheckpoint); - shouldStopAtCheckpointListener.onResponse(null); - }, - statsExc -> { - logger.warn("[{}] failed to persist should_stop_at_checkpoint update [{}]", - getTransformId(), - shouldStopAtCheckpoint); - shouldStopAtCheckpointListener.onFailure(statsExc); - } - )); + getIndexer().persistShouldStopAtCheckpoint(shouldStopAtCheckpoint, shouldStopAtCheckpointListener); } public synchronized void stop(boolean force, boolean shouldStopAtCheckpoint) { From bf9598d8acad02b59617028d7c2b558b412dfa28 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 16 Oct 2019 15:49:11 -0400 Subject: [PATCH 3/5] bumping index version --- .../test/upgraded_cluster/80_data_frame_jobs_crud.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml index 6e0b3eb69a5b5..4ba2597d09e84 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -271,6 +271,6 @@ setup: - do: indices.get_mapping: - index: .transform-internal-003 - - match: { \.transform-internal-003.mappings.dynamic: "false" } - - match: { \.transform-internal-003.mappings.properties.id.type: "keyword" } + index: .transform-internal-004 + - match: { \.transform-internal-004.mappings.dynamic: "false" } + - match: { \.transform-internal-004.mappings.properties.id.type: "keyword" } From 4aeed431ca7243de2ba2a68c95e35c11076b2f33 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Fri, 25 Oct 2019 11:29:51 -0400 Subject: [PATCH 4/5] Adjusting to show STOPPING with reason. Disallow force and wait_for_checkpoint --- .../transform/action/StopTransformAction.java | 10 ++ .../transform/transforms/TransformStats.java | 4 +- .../api/transform.stop_transform.json | 5 + .../test/transform/transforms_start_stop.yml | 9 ++ .../TransportGetTransformStatsAction.java | 36 +++-- ...TransportGetTransformStatsActionTests.java | 134 ++++++++++++++++++ 6 files changed, 183 insertions(+), 15 deletions(-) create mode 100644 x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java index 3a3d6134172bd..e0e18b10983e2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StopTransformAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.transform.action; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; @@ -32,6 +33,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.action.ValidateActions.addValidationError; + public class StopTransformAction extends ActionType { public static final StopTransformAction INSTANCE = new StopTransformAction(); @@ -136,6 +139,13 @@ public void writeTo(StreamOutput out) throws IOException { @Override public ActionRequestValidationException validate() { + if (force && waitForCheckpoint) { + return addValidationError(new ParameterizedMessage( + "cannot set both [{}] and [{}] to true", + TransformField.FORCE, + TransformField.WAIT_FOR_CHECKPOINT).getFormattedMessage(), + null); + } return null; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java index 4253cfdca068b..da6dd6128996e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java @@ -92,8 +92,8 @@ public static TransformStats stoppedStats(String id, TransformIndexerStats index public TransformStats(String id, State state, @Nullable String reason, - @Nullable NodeAttributes node, TransformIndexerStats stats, - TransformCheckpointingInfo checkpointingInfo) { + @Nullable NodeAttributes node, TransformIndexerStats stats, + TransformCheckpointingInfo checkpointingInfo) { this.id = Objects.requireNonNull(id); this.state = Objects.requireNonNull(state); this.reason = reason; diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/transform.stop_transform.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/transform.stop_transform.json index 07867e14b09ad..5076ff736d9c6 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/transform.stop_transform.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/transform.stop_transform.json @@ -21,6 +21,11 @@ ] }, "params": { + "force": { + "type": "boolean", + "required": false, + "description": "Whether to force stop a failed transform or not. Default to false" + }, "wait_for_completion": { "type": "boolean", "required": false, diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml index 7d5f83ee4fd16..f3ed8351803b9 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml @@ -224,6 +224,15 @@ teardown: allow_no_match: false transform_id: "missing-transform*" +--- +"Test stop transform with force and wait_for_checkpoint true ": + - do: + catch: /cannot set both \[force\] and \[wait_for_checkpoint\] to true/ + transform.stop_transform: + wait_for_checkpoint: true + force: true + transform_id: "airline-transform-start-stop-continuous" + --- "Test stop already stopped transform": - do: diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index 57397cf2c0b31..16b704959118a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -93,25 +94,14 @@ protected void taskOperation(Request request, TransformTask task, ActionListener ClusterState state = clusterService.state(); String nodeId = state.nodes().getLocalNode().getId(); if (task.isCancelled() == false) { - TransformState transformState = task.getState(); task.getCheckpointingInfo(transformCheckpointService, ActionListener.wrap( checkpointingInfo -> listener.onResponse(new Response( - Collections.singletonList(new TransformStats(task.getTransformId(), - TransformStats.State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()), - transformState.getReason(), - null, - task.getStats(), - checkpointingInfo)), + Collections.singletonList(deriveStats(task, checkpointingInfo)), 1L)), e -> { logger.warn("Failed to retrieve checkpointing info for transform [" + task.getTransformId() + "]", e); listener.onResponse(new Response( - Collections.singletonList(new TransformStats(task.getTransformId(), - TransformStats.State.fromComponents(transformState.getTaskState(), transformState.getIndexerState()), - transformState.getReason(), - null, - task.getStats(), - TransformCheckpointingInfo.EMPTY)), + Collections.singletonList(deriveStats(task, null)), 1L, Collections.emptyList(), Collections.singletonList(new FailedNodeException(nodeId, "Failed to retrieve checkpointing info", e)))); @@ -168,6 +158,26 @@ private static void setNodeAttributes(TransformStats transformStats, } } + static TransformStats deriveStats(TransformTask task, @Nullable TransformCheckpointingInfo checkpointingInfo) { + TransformState transformState = task.getState(); + TransformStats.State derivedState = TransformStats.State.fromComponents(transformState.getTaskState(), + transformState.getIndexerState()); + String reason = transformState.getReason(); + if (transformState.shouldStopAtNextCheckpoint() && + derivedState.equals(TransformStats.State.STOPPED) == false && + derivedState.equals(TransformStats.State.FAILED) == false) { + derivedState = TransformStats.State.STOPPING; + reason = reason.isEmpty() ? "transform is set to stop at the next checkpoint" : reason; + } + return new TransformStats( + task.getTransformId(), + derivedState, + reason, + null, + task.getStats(), + checkpointingInfo == null ? TransformCheckpointingInfo.EMPTY : checkpointingInfo); + } + private void collectStatsForTransformsWithoutTasks(Request request, Response response, ActionListener listener) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java new file mode 100644 index 0000000000000..7dbe2b4f3a2a1 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.transform.action; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStatsTests; +import org.elasticsearch.xpack.core.transform.transforms.TransformState; +import org.elasticsearch.xpack.core.transform.transforms.TransformStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.transforms.TransformTask; + +import java.time.Instant; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TransportGetTransformStatsActionTests extends ESTestCase { + + private TransformTask task = mock(TransformTask.class); + + public void testDeriveStatsStopped() { + String transformId = "transform-with-stats"; + String reason = ""; + TransformIndexerStats stats = TransformIndexerStatsTests.randomStats(); + TransformState stoppedState = + new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true); + withIdStateAndStats(transformId, stoppedState, stats); + TransformCheckpointingInfo info = new TransformCheckpointingInfo( + new TransformCheckpointStats(1, null, null, 1, 1), + new TransformCheckpointStats(2, null, null, 2, 5), + 2, + Instant.now()); + + assertThat(TransportGetTransformStatsAction.deriveStats(task, null), + equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, "", null, stats, TransformCheckpointingInfo.EMPTY))); + assertThat(TransportGetTransformStatsAction.deriveStats(task, info), + equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, "", null, stats, info))); + + + reason = "foo"; + stoppedState = new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0, reason, null, null, true); + withIdStateAndStats(transformId, stoppedState, stats); + + assertThat(TransportGetTransformStatsAction.deriveStats(task, null), + equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, TransformCheckpointingInfo.EMPTY))); + assertThat(TransportGetTransformStatsAction.deriveStats(task, info), + equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info))); + } + + public void testDeriveStatsFailed() { + String transformId = "transform-with-stats"; + String reason = ""; + TransformIndexerStats stats = TransformIndexerStatsTests.randomStats(); + TransformState failedState = + new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true); + withIdStateAndStats(transformId, failedState, stats); + TransformCheckpointingInfo info = new TransformCheckpointingInfo( + new TransformCheckpointStats(1, null, null, 1, 1), + new TransformCheckpointStats(2, null, null, 2, 5), + 2, + Instant.now()); + + assertThat(TransportGetTransformStatsAction.deriveStats(task, null), + equalTo(new TransformStats(transformId, TransformStats.State.FAILED, "", null, stats, TransformCheckpointingInfo.EMPTY))); + assertThat(TransportGetTransformStatsAction.deriveStats(task, info), + equalTo(new TransformStats(transformId, TransformStats.State.FAILED, "", null, stats, info))); + + + reason = "the task is failed"; + failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true); + withIdStateAndStats(transformId, failedState, stats); + + assertThat(TransportGetTransformStatsAction.deriveStats(task, null), + equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, TransformCheckpointingInfo.EMPTY))); + assertThat(TransportGetTransformStatsAction.deriveStats(task, info), + equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info))); + } + + + public void testDeriveStats() { + String transformId = "transform-with-stats"; + String reason = ""; + TransformIndexerStats stats = TransformIndexerStatsTests.randomStats(); + TransformState runningState = + new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true); + withIdStateAndStats(transformId, runningState, stats); + TransformCheckpointingInfo info = new TransformCheckpointingInfo( + new TransformCheckpointStats(1, null, null, 1, 1), + new TransformCheckpointStats(2, null, null, 2, 5), + 2, + Instant.now()); + + assertThat(TransportGetTransformStatsAction.deriveStats(task, null), + equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, + "transform is set to stop at the next checkpoint", null, stats, TransformCheckpointingInfo.EMPTY))); + assertThat(TransportGetTransformStatsAction.deriveStats(task, info), + equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, + "transform is set to stop at the next checkpoint", null, stats, info))); + + + reason = "foo"; + runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, true); + withIdStateAndStats(transformId, runningState, stats); + + assertThat(TransportGetTransformStatsAction.deriveStats(task, null), + equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, TransformCheckpointingInfo.EMPTY))); + assertThat(TransportGetTransformStatsAction.deriveStats(task, info), + equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, info))); + + // Stop at next checkpoint is false. + runningState = new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, reason, null, null, false); + withIdStateAndStats(transformId, runningState, stats); + + assertThat(TransportGetTransformStatsAction.deriveStats(task, null), + equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, TransformCheckpointingInfo.EMPTY))); + assertThat(TransportGetTransformStatsAction.deriveStats(task, info), + equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, info))); + } + + private void withIdStateAndStats(String transformId, TransformState state, TransformIndexerStats stats) { + when(task.getTransformId()).thenReturn(transformId); + when(task.getState()).thenReturn(state); + when(task.getStats()).thenReturn(stats); + } + +} From ec734e058101ab2e720a7f451a2a5c3d61115114 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Mon, 28 Oct 2019 10:26:37 -0400 Subject: [PATCH 5/5] removing unnecessary todos --- .../xpack/core/transform/transforms/TransformState.java | 1 - .../xpack/core/transform/transforms/TransformTaskState.java | 1 - .../xpack/transform/transforms/ClientTransformIndexer.java | 3 --- .../transform/transforms/TransformPersistentTasksExecutor.java | 1 - 4 files changed, 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java index 12a75621efa83..6517b1227481d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformState.java @@ -43,7 +43,6 @@ public class TransformState implements Task.Status, PersistentTaskState { @Nullable private NodeAttributes node; - // TODO: 8.x this needs to be deprecated and we move towards a STOPPING TASK_STATE private final boolean shouldStopAtNextCheckpoint; public static final ParseField TASK_STATE = new ParseField("task_state"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java index 0049f4a6272a8..e807dafc8b435 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskState.java @@ -14,7 +14,6 @@ import java.util.Locale; public enum TransformTaskState implements Writeable { - // TODO 8.x add a `STOPPING` state and BWC handling in ::fromString STOPPED, STARTED, FAILED; public static TransformTaskState fromString(String name) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index ec37621c3ef52..801dc413b4328 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -64,7 +64,6 @@ class ClientTransformIndexer extends TransformIndexer { // Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index private volatile String lastAuditedExceptionMessage = null; private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); - // TODO should be obviated in 8.x with TransformTaskState::STOPPING private volatile boolean shouldStopAtCheckpoint = false; private volatile Instant changesLastDetectedAt; @@ -482,7 +481,6 @@ protected void onFinish(ActionListener listener) { // This indicates an early exit since no changes were found. // So, don't treat this like a checkpoint being completed, as no work was done. if (hasSourceChanged == false) { - // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING if (shouldStopAtCheckpoint) { stop(); } @@ -529,7 +527,6 @@ protected void onFinish(ActionListener listener) { logger.debug( "[{}] finished indexing for transform checkpoint [{}].", getJobId(), checkpoint); auditBulkFailures = true; - // TODO should be obviated in 8.x with DataFrameTransformTaskState::STOPPING if (shouldStopAtCheckpoint) { stop(); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 78867068b265c..df42e5dbb7a3a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -211,7 +211,6 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa .setInitialPosition(stateAndStats.getTransformState().getPosition()) .setProgress(stateAndStats.getTransformState().getProgress()) .setIndexerState(currentIndexerState(transformState)) - // TODO should be obviated in 8.x with TransformTaskState::STOPPING .setShouldStopAtCheckpoint(transformState.shouldStopAtNextCheckpoint()); logger.debug("[{}] Loading existing state: [{}], position [{}]", transformId,